Scott's Blog

学则不固, 知则不惑

0%

Python 协程与生成器

协程的出现,就是为了解决回调编写难的问题。

协程

比如,回调模式编码复杂度高、同步编程的并发性不高,多线程编程需要线程间同步,有 Lock 的限制。

那么能不能采用同步的方式去编写异步的代码呢?

一个办法是使用单线程去切换任务,这样就不需要操作系统切换线程,也不需要锁,而且实现了并发性很高(但需要程序员自己去调度任务)类似这样:

1
2
3
4
5
6
7
8
9
10
11
12
13

def do_part1(arg):
# job B 为 IO 操作,会阻塞
# 我们希望在执行到 A 的时候,可以跳出去
# 继续执行耗费 CPU 的部分
a = do_job_A(arg)
b = do_job_B(a)
return b

def do_all_job():
p1 = do_part1(1)
result = do_part2(p1)
return result

说白了,我们需要一个这样的函数,即它可以暂停,并且在适当的时候恢复。

生成器进阶

send

生成器不只是可以产出值,还可以接收值。

1
2
3
4
5
6
7
8
9
def gen():
addr = yield "google.com"
print(addr)
yield "microsoft.com"

g = gen()
addr = next(g)
# send 的值,将函数赋值给内部的 addr
after_send = g.send("apple.com")

这里第一次需要调用 next 方法,否则会报 TypeError, 如果不调用 next,也可以在开始的时候 send 一个 None.

1
2
3
4
5
6
7
8
9
def gen():
addr = yield "google.com"
print(addr)
yield "microsoft.com"

g = gen()
g.send(None)
# send 的值,将函数赋值给内部的 addr
after_send = g.send("apple.com")

这是因为在调用 send 发送非 None 值之前,我们必须启动一次生成器。

有两种方式:

  1. 使用 next
  2. 或调用生成器对象的 send 方法传入 None 值

另外生成器,还有一个 close 方法,可以将生成器关闭。

如果一个生成器会 yield 三次,如果在第一次的时候 close 了,那么在第二次的时候调用 next 就会报 GeneratorExit 错误。

throw

1
2
3
4
5
6
7
8
9
10
11
12
13
def gen():
# 为什么不是 try microsoft
try:
yield "google.com"
except Exception:
print("Pass Test Error")
yield "microsoft.com"
yield "scottzhang.pro"

g = gen()
print(next(g))
g.throw(Exception, "TestError")
print(next(g))

yield from

Python 3.3 新语法。

yield from 可以理解为 yiled 的简化,比如下面这两种写法差不多是相等的:

1
2
3
4
5
6
7
8
def a_chain(*args, **kwargs):
for v1 in args:
for v2 in v1:
yield v2

def a_chain(*args, **kwargs):
for v1 in args:
yield from v1

yield from 还有另外一个功能,先看下面的代码:

1
2
3
4
5
6
7
def my_gen(gen):
# gen 是一个可迭代对象
yield from gen

def main():
g = my_gen()
g.send(None)

这里有几个概念需要区分清楚:

  • main 叫做调用者
  • my_gen 叫做委托生成器
  • gen 叫做子生成器

yield from 的另外一个特性就是能在调用者和子生成器之间建立一个通道,而且是双向的。

一个基于上面的模式编程的例子,要求是统计一个字典中值的个数, 并将值重新赋值为(num, [values]) 的形式,即值变为个数与值的元组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 结果字典
final_result = {}

# 子生成器
def sales_sum(pro_name):
total = 0
nums = []
while True:
x = yield
print(pro_name+"销量: ", x)
if not x:
break
total += x
nums.append(x)
return total, nums

# 委托生成器
def middle(key):
while True:
final_result[key] = yield from sales_sum(key)
print(key + "销量统计完成!!.")

# 调用者
def main():
# 产品销量字典
data_sets = {
"产品A": [1200, 1500, 3000],
"产品B": [28,55,98,108 ],
"产品C": [280,560,778,70],
}
for key, data_set in data_sets.items():
print("start key:", key)
m = middle(key)
m.send(None) # 预激 middle 协程
for value in data_set:
m.send(value) # 给协程传递每一组的值
m.send(None)
print("final_result:", final_result)

if __name__ == '__main__':
main()

可能你会有疑问为什么这里不把子生成器和委托生成器合二为一,这样直接使用 yield 也可以实现。

但这样你就得在生成器快结束的时候,自己处理生成器对象的异常。

1
2
3
4
5
6
# ...
my_gen.send(last_value)
try:
my_gen.send(None)
except StopIteration as e:
result = e.value

而 yield from 会帮我们自动处理这些逻辑,但还不止这些。

我们来看下 yield from 背后源码的原理, 看源码之前,我们先认识下所用到的变量。

PEP380

  • _i:子生成器,同时也是一个迭代器
  • _y:子生成器生产的值
  • _r:yield from 表达式最终的值
  • _s:调用方通过send()发送的值
  • _e:异常对象

摘录一段简单的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
_i = iter(EXPR)      # EXPR是一个可迭代对象,_i其实是子生成器;
try:
_y = next(_i) # 预激子生成器,把产出的第一个值存在_y中;
except StopIteration as _e:
_r = _e.value # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
else:
while 1: # 尝试执行这个循环,委托生成器会阻塞;
_s = yield _y # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
try:
_y = _i.send(_s) # 转发_s,并且尝试向下执行;
except StopIteration as _e:
_r = _e.value # 如果子生成器抛出异常,那么就获取异常对象的`value`属性存到_r,退出循环,恢复委托生成器的运行;
break
RESULT = _r # _r就是整个yield from表达式返回的值。

此外,yield from 还需要处理这些逻辑:

  1. 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw()和.close()方法;
  2. 如果子生成器支持.throw()和.close()方法,但是在子生成器内部,这两个方法都会抛出异常;
  3. 调用方让子生成器自己抛出异常
  4. 当调用方使用next()或者.send(None)时,都要在子生成器上调用next()函数,当调用方使用.send()发送非 None 值时,才调用子生成器的.send()方法;

总结一下:

总结一下关键点:

  1. 子生成器生产的值,都是直接传给调用方的;调用方通过.send()发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的__next__()方法,如果不是 None,会调用子生成器的.send()方法;
  2. 子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
  3. yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
  4. 如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上 "冒泡";
  5. 传入委托生成器的异常里,除了GeneratorExit之外,其他的所有异常全部传递给子生成器的.throw()方法;如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上 "冒泡";
  6. 如果在委托生成器上调用.close()或传入GeneratorExit异常,会调用子生成器的.close()方法,没有的话就不调用。如果在调用.close()的时候抛出了异常,那么就向上 "冒泡",否则的话委托生成器会抛出GeneratorExit异常。

async 和 await

Python 3.5+ 后,引入了 async 和 await 两个关键字,定义了原生的协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def downloader(url):
return "Fake Result"

async def download_url(url):
# do something
# 将任务交给子协程完成
# await 后面跟的是 collections.Awaittable 对象
# 也可以实现 __await__ 魔法方法来支持
# await 可以理解为 yield from
result = await downloader(url)
return result

if __name__ == "__main__":
coro = download_url("www.scottzhang.pro")
coro.send(None)

注意生成器是不可以直接传给 await,下面的代码会报错

1
2
3
4
5
6
7
8
9
10
def downloader(url):
yield "Fake Result"

async def download_url(url):
result = await downloader(url)
return result

if __name__ == "__main__":
coro = download_url("www.scottzhang.pro")
coro.send(None)

解决办法是,加一个装饰器, 这个装饰器会将这个函数实现 __await__ 方法。

1
2
3
4
5
import types

@types.coroutine
def downloader(url):
yield "Fake Result"