一个 Python 内置的异步IO并发编程模块。
asyncio 包含不同系统下实现的事件循环,如 Windows 下的 Select,Linux 下的 epoll.
它还对 TCP UDP 等传输协议做了抽象,支持SSL、子进程、延时调用。
模仿 futures 模块但适用于事件循环使用的 future 类。 模仿 threading 模块中的同步原语,可以在单线程内的协程之间。
基于 yield from 的协议和任务,让你可以用顺序的方式编写并发代码。
必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池。
它主要的功能是为了做协程调度,但也可以让我们把多线程、多进程融合进来。
Asyncio 介绍
Python 用于解决异步 I/O 编程的一整套解决方案,其应用有:
- Tornado
- Gevent
- Twisted (scrapy, Django channels)
Tornado 底层也是使用协程和事件循环的方式完成高并发,另外它还实现了 Web 服务器。
对于 django 或者 flask 是不提供 web 服务器的,不会去做很多 socket 编码的事情,只是完成了基本的功能让你可以调试应用。
所以 django 和 flask,他们还需要一个实现高并发 socket 接口的框架,如 uwsgi, gunicorn,可能再加一个 nginx。
参考: 理解 asyncio 来构建高性能 Python 网络程序
Tornado 不能简单的使用平时使用的阻塞数据库驱动。
基本使用与事件循环
1 2 3 4 5 6 7 8 9 10
| import asyncio
async def get_data(table): print("sleep started") await asyncio.sleep(3) print("sleep finished")
if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(get_data("test_db"))
|
输出:
1 2 3
| ❯ python async_play.py sleep started sleep finished
|
注意这个函数的写法:
1 2 3 4 5 6 7 8 9 10
| async def get_data(table): await asyncio.sleep(3) await time.sleep(3) time.sleep(3)
|
我们看到这里 time.sleep 是不支持 asyncio 的,需要将其换成 asyncio.sleep。同理,对于我们其他的 I/O 操作,比如和数据库交互,请求网页,我们也需要一个支持 asyncio 的框架,才可以配合一起工作。
下面来看一段完整的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import asyncio import time
async def get_data(table): print("sleep started") await asyncio.sleep(2) print("sleep finished")
if __name__ == "__main__": start_time = time.time()
loop = asyncio.get_event_loop() tasks = [get_data("test_db") for i in range(10)] loop.run_until_complete(asyncio.wait(tasks))
print(time.time()-start_time)
|
现在让我们给这段代码加上回调函数,比如在数据库数据拿到之后,就去调用一个回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import asyncio import time from functools import partial
async def get_data(table): print("sleep started") await asyncio.sleep(2) print("sleep finished")
def callback_send_email(url, future): print(url) print("send email to bobby")
if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop()
task = loop.create_task(get_data("test_db"))
task.add_done_callback( partial(callback_send_email, "test_db") ) loop.run_until_complete(task) print(task.result())
|
另外 asyncio 还支持 wait 和 gather 函数,可以对任务进行精细化控制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import asyncio import time async def get_data(table): print("sleep started") await asyncio.sleep(2) print("sleep finished")
if __name__ == "__main__": start_time = time.time()
loop = asyncio.get_event_loop() tasks = [get_data("test_db1") for i in range(10)]
group1 = [get_data("test_db2") for i in range(2)] group2 = [get_data("test_db1") for i in range(2)]
group1 = asyncio.gather(*group1) group2 = asyncio.gather(*group2)
group2.cancel() loop.run_until_complete(asyncio.gather(group1, group2)) print(time.time() - start_time)
|
Task 取消和子协程调用原理
1 2 3 4 5 6 7
| import asyncio
loop = asyncio.get_event_loop()
loop.run_forever()
loop.run_until_complete()
|
在 run_until_complete
内部,会将传递进来指定的 future 对象增加一个 add_done_callback
方法。
在指定的 future 执行完成之后,就会调用 _run_until_complete_cb
方法,而 _run_until_complete_cb
内部就会调用 loop 即事件循环 的 stop。
在理解了上面这两个函数之后,我们就知道如何去启动一个永不停止的函数。
那么如何去停止它呢?
loop 实际上,会被放到 future中,(future 中也有 loop 😅),它内部的实现比较绕,我们通过一个实际例子来看如何取消 loop 的运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import asyncio import time
async def get_html(sleep_times): print("waiting") await asyncio.sleep(sleep_times) print("done after {}s".format(sleep_times))
if __name__ == "__main__": task1 = get_html(2) task2 = get_html(3) task3 = get_html(3)
tasks = [task1, task2, task3]
loop = asyncio.get_event_loop()
try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: all_tasks = asyncio.Task.all_tasks() for task in all_tasks: print("cancel task") print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()
|
协程嵌套协程
参考官方文档的一个例子(最新文档已被移除)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import asyncio
async def compute(x, y): print("Compute %s + %s ..." % (x, y)) await asyncio.sleep(1.0) return x + y
async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
|
以及它的一个时间顺序图:
asyncio 其他函数
这里的其他函数指的是,在进行 task 管理的时候,你希望一些 task 可以尽快或者马上运行。
先定义 call back 和 stop 函数。
1 2 3 4 5 6 7 8 9
| import asyncio
def callback(sleep_times, loop): """回调函数""" print("success time {}".format(loop.time()))
def stoploop(loop): """停止loop""" loop.stop()
|
call_soon
指定马上(在队列中)运行。
1 2 3 4 5
| if __name__ == "__main__": loop = asyncio.get_event_loop() loop.call_soon(callback, 4, loop) loop.call_soon(stoploop, loop) loop.run_forever()
|
call_later
指定一个时间后运行, 多个时间会排序,比 call_soon 慢。
1 2 3 4 5 6 7 8
| if __name__ == "__main__": now = loop.time() loop.call_later(2, callback, 2 ) loop.call_later(1, callback, 1) loop.call_later(3, callback, 3) loop.call_soon(callback, 4) loop.run_forever()
|
call_at
指定一个确切的时间(如当前时间加多少秒)。
1 2 3 4 5 6 7
| if __name__ == "__main__": now = loop.time() loop.call_at(now+2, callback, 2, loop) loop.call_at(now+1, callback, 1, loop) loop.call_at(now+3, callback, 3, loop) loop.run_forever()
|
call_soon_threadsafe
线程安全的方法。
比如 callabck 中的变量,可能在其他的方法中会改动,那么这时候使用 call_soon_threadsafe 才比较安全。
线程池与 asyncio
asyncio 为什么要和线程池结合呢?asyncio 我们说这是一个异步 IO 框架,它其实支持多线程和多进程的,也包括协程。
我们要将一个阻塞的接口(比如访问DB)和 asyncio 结合起来的话,要怎么用呢?
像上面的情况,都是需要特定的函数的(如 asyncio.sleep)?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparse
def get_url(url): pass
if __name__ == "__main__": import time start_time = time.time() loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(3)
tasks = [] for url in range(20): url = "http://shop.projectsedu.com/goods/{}/".format(url) task = loop.run_in_executor(executor, get_url, url) tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks)) print("last time:{}".format(time.time()-start_time))
|
asyncio 模拟 http 请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import asyncio import socket from urllib.parse import urlparse
async def get_url(url): url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/"
reader, writer = await asyncio.open_connection(host,80) writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
all_lines = [] async for raw_line in reader: data = raw_line.decode("utf8") all_lines.append(data) html = "\n".join(all_lines) return html
async def main(): tasks = [] for url in range(20): url = "http://shop.projectsedu.com/goods/{}/".format(url) tasks.append( asyncio.ensure_future(get_url(url)) ) for task in asyncio.as_completed(tasks): result = await task print(result)
if __name__ == "__main__": import time start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print('last time:{}'.format(time.time()-start_time))
|
Future 和 Task
我们知道 Future 是一个任务结果容器,Asyncio 也有自己的 Future,它和线程池中的 Future 几乎是一致的。
当结果容器完成后,就会去运行 callback。
Task 是 Future 的子类。是协程和 Future 之间的桥梁。
asyncio 同步和通信
一般来说,在单线程模式下是不需要锁的,下面的代码打印 total 最终为0.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| total = 0
async def add(): global total for i in range(10000): total += 1
async def desc(): global total for i in range(10000): total -= 1
if __name__ == '__main__': import asyncio tasks = [add(), desc()]
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print(total)
|