Scott's Blog

学则不固, 知则不惑

0%

Python Asyncio 并发编程

一个 Python 内置的异步IO并发编程模块。

asyncio 包含不同系统下实现的事件循环,如 Windows 下的 Select,Linux 下的 epoll.

它还对 TCP UDP 等传输协议做了抽象,支持SSL、子进程、延时调用。

模仿 futures 模块但适用于事件循环使用的 future 类。 模仿 threading 模块中的同步原语,可以在单线程内的协程之间。

基于 yield from 的协议和任务,让你可以用顺序的方式编写并发代码。

必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池。

它主要的功能是为了做协程调度,但也可以让我们把多线程、多进程融合进来。

Asyncio 介绍

Python 用于解决异步 I/O 编程的一整套解决方案,其应用有:

  • Tornado
  • Gevent
  • Twisted (scrapy, Django channels)

Tornado 底层也是使用协程和事件循环的方式完成高并发,另外它还实现了 Web 服务器。

对于 django 或者 flask 是不提供 web 服务器的,不会去做很多 socket 编码的事情,只是完成了基本的功能让你可以调试应用。

所以 django 和 flask,他们还需要一个实现高并发 socket 接口的框架,如 uwsgi, gunicorn,可能再加一个 nginx。

参考: 理解 asyncio 来构建高性能 Python 网络程序

Tornado 不能简单的使用平时使用的阻塞数据库驱动。

基本使用与事件循环

1
2
3
4
5
6
7
8
9
10
import asyncio

async def get_data(table):
print("sleep started")
await asyncio.sleep(3)
print("sleep finished")

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(get_data("test_db"))

输出:

1
2
3
❯ python async_play.py
sleep started
sleep finished

注意这个函数的写法:

1
2
3
4
5
6
7
8
9
10
async def get_data(table):
# 正确写法
await asyncio.sleep(3)
# 如果我们不小心将 sleep 换成了 time, 则会报错 TypeError
# 因为 await后面必须跟一个 awaitable 的对象,这里返回 None 显然不是
await time.sleep(3)
# 如果不加 await, 就会变成顺序的执行过程了
# 因 time.sleep() 是同步的操作,而 asyncio.sleep() 则会
# 立即返回一个 future 对象
time.sleep(3)

我们看到这里 time.sleep 是不支持 asyncio 的,需要将其换成 asyncio.sleep。同理,对于我们其他的 I/O 操作,比如和数据库交互,请求网页,我们也需要一个支持 asyncio 的框架,才可以配合一起工作。

下面来看一段完整的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time

async def get_data(table):
print("sleep started")
await asyncio.sleep(2)
print("sleep finished")

if __name__ == "__main__":
start_time = time.time()

loop = asyncio.get_event_loop()
tasks = [get_data("test_db") for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))

print(time.time()-start_time)

现在让我们给这段代码加上回调函数,比如在数据库数据拿到之后,就去调用一个回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import time
from functools import partial

async def get_data(table):
print("sleep started")
await asyncio.sleep(2)
print("sleep finished")

def callback_send_email(url, future):
print(url)
print("send email to bobby")

if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()

# 这种方式也是可以的,基本上等价于下面的 create_task
# get_future = asyncio.ensure_future(get_data("test_db"))
task = loop.create_task(get_data("test_db"))

# partial 将你的函数包装成一个新的函数,可以将参数放进去
# 要使用它,需要确保将你的函数要传入的参数放在前面
task.add_done_callback(
partial(callback_send_email, "test_db")
)
loop.run_until_complete(task)
print(task.result())

另外 asyncio 还支持 wait 和 gather 函数,可以对任务进行精细化控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

import asyncio
import time
async def get_data(table):
print("sleep started")
await asyncio.sleep(2)
print("sleep finished")

if __name__ == "__main__":
start_time = time.time()

loop = asyncio.get_event_loop()
tasks = [get_data("test_db1") for i in range(10)]
# loop.run_until_complete(asyncio.gather(*tasks))
# print(time.time()-start_time)

# gather和wait的区别
# gather更加high-level
group1 = [get_data("test_db2") for i in range(2)]
group2 = [get_data("test_db1") for i in range(2)]

group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)

# gather 支持批量取消
group2.cancel()
loop.run_until_complete(asyncio.gather(group1, group2))
print(time.time() - start_time)

Task 取消和子协程调用原理

1
2
3
4
5
6
7
import asyncio

loop = asyncio.get_event_loop()
# run_forever 回一直运行
loop.run_forever()
# 会在运行到指定的协程后停止,这是如何做到的呢?
loop.run_until_complete()

run_until_complete 内部,会将传递进来指定的 future 对象增加一个 add_done_callback 方法。

在指定的 future 执行完成之后,就会调用 _run_until_complete_cb 方法,而 _run_until_complete_cb 内部就会调用 loop 即事件循环 的 stop。

在理解了上面这两个函数之后,我们就知道如何去启动一个永不停止的函数。

那么如何去停止它呢?

loop 实际上,会被放到 future中,(future 中也有 loop 😅),它内部的实现比较绕,我们通过一个实际例子来看如何取消 loop 的运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
import time

async def get_html(sleep_times):
print("waiting")
await asyncio.sleep(sleep_times)
print("done after {}s".format(sleep_times))


if __name__ == "__main__":
task1 = get_html(2)
task2 = get_html(3)
task3 = get_html(3)

tasks = [task1, task2, task3]

loop = asyncio.get_event_loop()

try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
# 这里是如何后续到上面 loop 中的所有 task 的?它完全都没有访问 loop
# 因为在单线程中,很多变量都是共享的,当直接调用 asyncio.Task.all_tasks()
# 的时候,它内部会去直接拿 loop,若没有则会创建一个新的
all_tasks = asyncio.Task.all_tasks()
for task in all_tasks:
print("cancel task")
print(task.cancel())
# stop 只是标记 stoping 为 True
# close 则会清空 ready,schedule 队列清空,关闭进程和线程池
# stop 后一定要调用 run_forever 方法才不会报错
loop.stop()
loop.run_forever()
finally:
loop.close()

协程嵌套协程

参考官方文档的一个例子(最新文档已被移除)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y

async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

以及它的一个时间顺序图:

asyncio 其他函数

这里的其他函数指的是,在进行 task 管理的时候,你希望一些 task 可以尽快或者马上运行。

先定义 call back 和 stop 函数。

1
2
3
4
5
6
7
8
9
import asyncio

def callback(sleep_times, loop):
"""回调函数"""
print("success time {}".format(loop.time()))

def stoploop(loop):
"""停止loop"""
loop.stop()

call_soon

指定马上(在队列中)运行。

1
2
3
4
5
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.call_soon(callback, 4, loop)
loop.call_soon(stoploop, loop)
loop.run_forever()

call_later

指定一个时间后运行, 多个时间会排序,比 call_soon 慢。

1
2
3
4
5
6
7
8
if __name__ == "__main__":
now = loop.time()
loop.call_later(2, callback, 2 )
loop.call_later(1, callback, 1)
loop.call_later(3, callback, 3)
# 这个 call_soon 会先运行
loop.call_soon(callback, 4)
loop.run_forever()

call_at

指定一个确切的时间(如当前时间加多少秒)。

1
2
3
4
5
6
7
if __name__ == "__main__":
now = loop.time()
loop.call_at(now+2, callback, 2, loop)
loop.call_at(now+1, callback, 1, loop)
loop.call_at(now+3, callback, 3, loop)
loop.run_forever()

call_soon_threadsafe

线程安全的方法。

比如 callabck 中的变量,可能在其他的方法中会改动,那么这时候使用 call_soon_threadsafe 才比较安全。

线程池与 asyncio

asyncio 为什么要和线程池结合呢?asyncio 我们说这是一个异步 IO 框架,它其实支持多线程和多进程的,也包括协程。

我们要将一个阻塞的接口(比如访问DB)和 asyncio 结合起来的话,要怎么用呢?

像上面的情况,都是需要特定的函数的(如 asyncio.sleep)?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#使用多线程:在协程中集成阻塞io
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse


def get_url(url):
# 通过socket请求html
pass


if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)

tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
task = loop.run_in_executor(executor, get_url, url)
tasks.append(task)

loop.run_until_complete(asyncio.wait(tasks))
print("last time:{}".format(time.time()-start_time))

asyncio 模拟 http 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# asyncio 没有提供http协议的接口 aiohttp
import asyncio
import socket
from urllib.parse import urlparse


async def get_url(url):
# 通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"

# 建立socket连接,比较费事,使用 await
# asyncio.open_connection 会自动帮我们注册
reader, writer = await asyncio.open_connection(host,80)
writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))

# 通过 async for 读取数据
# reader 内部实现了 anext 魔法方法
all_lines = []
async for raw_line in reader:
data = raw_line.decode("utf8")
all_lines.append(data)
html = "\n".join(all_lines)
return html

async def main():
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
tasks.append(
# append 是 futrue
asyncio.ensure_future(get_url(url))
)
for task in asyncio.as_completed(tasks):
result = await task
print(result)

if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('last time:{}'.format(time.time()-start_time))

Future 和 Task

我们知道 Future 是一个任务结果容器,Asyncio 也有自己的 Future,它和线程池中的 Future 几乎是一致的。

当结果容器完成后,就会去运行 callback。

Task 是 Future 的子类。是协程和 Future 之间的桥梁。

asyncio 同步和通信

一般来说,在单线程模式下是不需要锁的,下面的代码打印 total 最终为0.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
total = 0

async def add():
global total
for i in range(10000):
total += 1

async def desc():
global total
for i in range(10000):
total -= 1


if __name__ == '__main__':
import asyncio
tasks = [add(), desc()]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print(total)