Scott's Blog

学则不固, 知则不惑

0%

Python 协程和异步IO

介绍了一些异步IO的基本概念与 Select 即回调函数的使用。

一些基本概念

在学习协程之前,建议先复习一下 socket 编程和多线程的知识。

并发与并行

并发:一个时间段、有几个程序在同一个 CPU 上运行,任意时刻只有一个程序在CPU上运行。

并行:任意时刻,有多个程序运行在多个 CPU 上。

同步和异步

是一种消息通信机制,把操作看成消息在不同线程、协程中发送,然后得到 Future进行后续操作。

同步:代码调用IO操作时,必须等待IO操作完成菜返回的调用方式。

异步:代码调用IO操作时,不必等操作完成就返回的调用方式。

阻塞和非阻塞

是一种函数调用的机制。

阻塞:调用函数是当前线程被挂起。

非阻塞:调用函数时,当前线程不会被挂起,而是立即返回。

什么是 C10K 问题?

C10K,一个1999 年提出来的技术挑战,即我们如何在1颗 1GHz CPU,2G 内存,1gbps 网络环境下,让单台服务器同时为1万个客户端提供FTP服务。

IO 多路复用

Unix 下的五种 I/O 模型:

  • 阻塞 I/O
  • 非阻塞 I/O
  • 多路复用 I/O
  • 信号驱动是 I/O
  • 异步I/O (POSIX 的 aio_系列函数)

以 socket 中的连接建立为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# --- 阻塞 I/O
# 基础的用的最多的,socket.connect 连接成功后再返回,这时候产生阻塞
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, 80))


# --- 非阻塞 I/O
# 发出 socket.connect 后马上返回(不阻塞,但没有结果)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)
client.connect((host, 80))
# 随后不停询问服务端连接是否,成功后才可以发送消息
# 不过此时,也可以直接就开始干别的事情
while (client connected):
pass
client.send("")


# ---多路复用 I/O
# 调用操作系统的 select 方法,操作系统会告诉我们哪些 socket 的端口
# 和文件句柄已经准备好了,它支持监听多个文件和 socket(select 本质上也是阻塞方法)
# 比如我们可以监听100个端口,只要其中某个端口可用,我们就可以立即处理


# ---信号驱动式 I/O
# 应用较少,暂时不做介绍


# ---异步 I/O
# aio_read 真正的异步

select,poll,epoll 都是 I/0 多路复用的机制。I/0 多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

但select,poll, epoll 本质上都是同步 I/0,因为他们都需要在读写事件就绪后自己负责 进行读写,也就是说这个读写过程是阻塞的,而异步V/0则无需自己负责进行读写,异步IO的实现会负责把数据从内核拷贝到用户空间。

Select

select 函数监视的文件描述符分3类,分别是 writefds、readfds, 和 exceptfds。

调用后 select 函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指指定等待时间,如果立即返回设为null即可),函数返回。

当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。

select的一个缺点在于,单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

Poll

不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现。

polfd结构包含了要监视的event和发生的event,不再使用select "参数-值" 传递的方式。

同时,pollfd并没有最大数量限制( 但是数量过大后性能也是会下降)。和select西数一样,pol返回后需要轮询polfd来获取就绪的描述符。从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取 已经就绪的socket。

事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态 ,因此随着监视的描述符数量的增长,其效率也会线性下降

Epoll

epoll是在2.6内核中提出的,是之前的select和pol的增强版本。

相对于select和poll来说,epol更加灵活,没有描述符限制。epoll使用-一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

epoll 实现使用了红黑树。

对比

在并发高的情况下,连接活跃度不是很高,epoll 比 selelct 好;

并发不高,连接很活跃的时候,select 比 epoll 好;

实例:非阻塞I/O

先看一段使用非阻塞IO完成 socket 请求与接受的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import socket
from urllib.parse import urlparse


# 使用非阻塞io完成http请求
def get_url(url):
# 通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"

# 建立socket连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)
try:
client.connect((host, 80)) #阻塞不会消耗cpu
except BlockingIOError as e:
pass

# 不停的询问连接是否建立好, 需要while循环不停的去检查状态
# 做计算任务或者再次发起其他的连接请求

send_to = "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")
while True:
try:
client.send(send_to)
break
except OSError as e:
pass


data = b""
while True:
try:
d = client.recv(1024)
except BlockingIOError as e:
continue
if d:
data += d
else:
break

data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()

if __name__ == "__main__":
get_url("http://www.baidu.com")


实例:Select 回调

下面这个是使用 Select 的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import socket
from urllib.parse import urlparse

# 需处理系统兼容性问题
# 如 Windows 和 Linux 不一样, 推荐使用 selectors
# import select

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE


selector = DefaultSelector()
# 使用select完成http请求
# 一个线程发出url请求后即可不管,操作系统 select
# 会自动使用可用的 socket 处理,对比多线程中一个线程对应一个 url
# 省去了线程切换的开销,以及其占用的内存
urls = []
stop = False


class Fetcher:
def connected(self, key):
selector.unregister(key.fd)
self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
selector.register(self.client.fileno(), EVENT_READ, self.readable)

def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
data = self.data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
self.client.close()
urls.remove(self.spider_url)
if not urls:
global stop
stop = True

def get_url(self, url):
self.spider_url = url
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"

# 建立socket连接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False)

try:
self.client.connect((self.host, 80)) # 阻塞不会消耗cpu
except BlockingIOError as e:
pass

#注册
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)


def loop():
#事件循环,不停的请求socket的状态并调用对应的回调函数
# 1. select本身是不支持register模式
# 2. socket状态变化以后的回调是由程序员完成的
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data
call_back(key)
# 回调+事件循环+select(poll\epoll)

if __name__ == "__main__":
fetcher = Fetcher()
import time
start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
urls.append(url)
fetcher = Fetcher()
fetcher.get_url(url)
loop()
print(time.time()-start_time)

回调的问题

使用回调虽然可以带来效率上的提升,但是也会有一些问题,包括:

  • 回调函数执行不正常怎么办?
  • 回调函数里还要嵌套回调怎么办?嵌套多层怎么办?
  • 多层嵌套中,某个环节出错了怎么办?
  • 有个数据,需要每个回调函数都处理怎么办?
  • 怎么使用当前函数中的局部变量?

归纳来看,可以说回调的问题在于:

  1. 代码可读性变差
  2. 共享状态的管理困难
  3. 处理异常比较麻烦

要处理这个问题,这时候就轮到协程出场了。