原文地址:https://www.aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html
作者简介
A. Jesse Jiryu Davis,MongoDB 纽约的一位工程师,他是 Python 异步 MongoDB 驱动 Monter 的作者,同时也是 MongoDB C 语言驱动开发的领导者,PyMongo 开发团队的成员,此外,他也为 asyncio 和 Tornado 的开发做了贡献。他的博客地址:http://emptysqua.re
Guido van Rossum,主流编程语言 Python 的创始人,Python 社区称其为 BDFL(Benevolent Dictator For Life,终身仁慈*者,来自于戏剧《Monty Python》),他的个人主页为:http://www.python.org/~guido/
前言
传统计算机科学强调高效的算法可以让计算尽可能的快速完成。但是很多网络程序花费的时间并不在于计算,而在于维护许多比较慢或是互动较少的网络连接,这些程序面临着一个独特的挑战:让大量的网络事件变得高效。现今对此的应对方法为异步 I/O,或称为“异步”。
本文呈现的是一个简单的网络爬虫。这是一个典型的异步程序,因为它等待着许多网络响应,而做的计算又相当少。同一时间它能抓取的页面越多,就能完成的更快。如果为每一个请求分配一个线程的话,那么随着并发量增大势必会导致在耗尽套接字之前,内存或其他线程相关资源先被耗尽。而使用异步 I/O 可以避免这个问题。
我们将分 3 个阶段展示这个例子。首先,我们给出一个异步事件循环和使用这个基于回调的事件循环的爬虫结构,它非常的高效,但将其扩展到更复杂的问题将会导致难以管理的面条式代码;然后,正是由于之前的难以扩展,我们将使用 Python 的协程使其兼具效率与扩展性,我们将使用 Python 的生成器函数实现简单的协程;最后,我们将使用具备完善功能的 Python 标准库:asyncio1,并结合异步队列重新实现。
任务
一个网络爬虫将寻找并下载网站的所有页面,也许是为了索引它们或将其归档。以根地址开始,它将抓取每一个页面,解析其中未抓取的链接,并将其加入待抓取队列,在所有页面都不存在未抓取的链接,且待抓取队列为空时,它就会停止。
我们可以在同一时间下载更多的队列以加速这一过程,当爬虫获取到新链接时,它会并行的打开多个套接字以采集新的页面,同时在响应到达时解析,并将解析到的新链接添加到队列中。当并发较大时,可能会导致性能下降,因此我们限制并发数,并将剩余的链接放入队列中,知道请求中的链接完成后再继续抓取。
传统方法
如何让爬虫并行呢,按照传统方法,我们会创建一个线程池,每个线程会负责使用套接字去下载单个页面,比如下载 xkcd.com
:
def fetch(url):
sock = socket.socket()
sock.connect(('xkcd.com', 80))
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
# Page is now downloaded.
links = parse_links(response)
q.add(links)
socket 操作默认为阻塞的,当线程调用像 connect
或 recv
这样的方法时,它会阻塞直到方法返回2。因此想要在同一时间下载多个页面,也需要对应数量的线程。一个复杂的应用程序通常会将多个空闲的线程维护在一个线程池以分摊线程创建时的开销,并在随后的任务中检查和使用它们。同样的方法也适用于使用连接池的套接字。
然而,线程资源是昂贵的,操作系统也会对进程、用户或机器使用线程的数量分别做出限制。在 Jess 的系统上,一个 Python 线程花费 50K 内存,而启动成千上万的的线程则会失败。如果我们将 scoket 并发提升为上万个,那么我们会在 scoket 耗尽时先耗尽线程资源。每个线程的开销和系统对线程的限制是它们的瓶颈。
Dan Kegel 在他那篇具有影响力的文章“The C10K problem”3中,描述了多线程在 I/O 操作中的局限,他在开始时说到:
是时候让 web 服务器能够同时处理成千上万的客户端连接了,你觉得呢?毕竟如今的网络已是一个很大的地方了。
Kegel 在 1999 年创造了“C10K”这个术语,上万的连接在如今看来是可以接受的,但问题依然存在,只是大小有所变化。回到那时,对于 C10K 问题来说,为每个连接使用一个线程是不切实际的。如今的线程限制会比当时高出几个数量级。事实上,我们的小爬虫使用线程也可以工作的很好,但对于具有成百上千连接的大规模应用,大部分系统仍可以创建套接字,但线程资源早已消耗完了。我们该如何克服这个问题呢?
异步
异步 I/O 框架使用非阻塞套接字让单个线程执行并发操作,在我们的异步爬虫中,我们在它连接到服务器前将套接字设置为非阻塞:
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('xkcd.com', 80))
except BlockingIOError:
pass
恼人的是,非阻塞套接字会在 connect
时抛出异常,即使它工作的很正常。这个异常复现了底层 C 语言方法的烦人行为,它会将errno
设置为 EINPROGRESS
,告诉你已经开始了。
现在我们的爬虫需要一种方式去知晓连接已经建立,好发送 HTTP 请求,我们可以简单的使用一个循环来重试:
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
encoded = request.encode('ascii')
while True:
try:
sock.send(encoded)
break # Done.
except OSError as e:
pass
print('sent')
这个方法不仅消耗 CPU,还不能有效的监听多个套接字。过去,BSD Unix 团队的解决方式是使用 select
,一个 C 语言方法以提供对一个或多个套接字活动进行监听。如今随着互联网应用处理的连接量越来越大,催生出了像 epoll
这样的替代方法,在 BSD 上,有 kqueue
,而 linux 上则有 epoll
。它们的 API 与 select
相似,但在处理大量连接时具有很好的性能。
Python 3.4 的默认选择器使用其运行的操作系统上的支持的最优方法。要注册一个网络 I/O 事件监听,我们需要创建一个非阻塞套接字并将其注册到默认的选择器上:
import socket
from selectors import DefaultSelector, EVENT_WRITE
selector = DefaultSelector()
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('xkcd.com', 80))
except BlockingIOError:
pass
def connected():
selector.unregister(sock.fileno())
print('connected!')
selector.register(sock.fileno(), EVENT_WRITE, connected)
我们忽略这个伪装的错误,并调用 selector.register
,将套接字的文件描述符、一个代表我们要等待事件的常量。为了在连接建立时收到通知,我们传递了 EVENT_WRITE
:代表我们想知道套接字何时可写。同时我们也传递了 connect
方法,在事件发生时调用,类似于回调函数。
我们在一个循环中处理选择器接收到的 I/O 通知:
def loop():
while True:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback()
回调方法 connected
被存放为 event_key.data
,我们在非阻塞套接字连接后取回并执行。
与上面快速轮转的循环不同,这个循环会在调用 select
后停下来,等待下一个 I/O 事件,然后会调用对应的事件回调方法。尚未完成的操作将会被挂起直到事件循环发出通知。
目前为止我们证明了什么?我们展示了如何开始一个事件并在其准备好时执行其对应的回调。异步框架正是基于我们展示的这两个功能——非阻塞套接字和事件循环——以在单个线程中执行并发操作。
我们在这里实现了“并发”,但并非是传统的“并行”。这是因为我们构建了一个可以进行重叠 I/O 的微系统,它可以在其他操作还在进行时执行新的操作,它实际上并没有利用多核心去并行计算。但是,这么设计是用于解决基于 I/O 的问题,而不是基于 CPU 的问题4。
因此我们的事件循环在处理并发 I/O 时非常有效,因为它没有将线程资源分配到每一个连接。但是在我们继续前,纠正一个常见的误解是非常重要的,那就是异步要比多线程更快。但通常并不是这样——实际上,在 Python 中服务于少量非常活跃的连接,类似于我们这样的事件循环要略慢于多线程。若是在运行时没有全局解释器锁,线程在这样的工作量上还会表现得更好。异步 I/O 真正适用的是一个具有很多慢连接或低活跃度连接的应用5。
使用回调编程
目前为止,我们构建了一个小小的异步框架,我们该如何构建一个爬虫呢?即便是一个简单的 URL 抓取程序写起来也是很痛苦的。
我们以两个集合:已经收集到的 URL 集合和尚未抓取的 URL 集合来开始:
urls_todo = set(['/'])
seen_urls = set(['/'])
seen_urls
集合包含 urls_todo
集合和已经采集了的 URL,这两个集合初始化时都带着根路径“/”。
抓取一个页面需要一系列的回调,connected
方法在套接字连接时调用,同时发送了一个 GET 请求到服务器,此时它需要等待服务器响应,所以要注册一个新的回调方法,如果新的回调执行时还需发送新的请求,则需要再次注册另外的回调,如此反复。
让我们将所有的回调放到一个 Fetcher
对象中,它需要 一个 URL,一个 socket 实例,和一个用来接收响应字节的变量:
class Fetcher(object):
def __init__(self, url):
self.response = b'' # Empty array of bytes.
self.url = url
self.sock = None
我们以调用 Fetcher.fetch
开始:
# Method on Fetcher class.
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('xkcd.com', 80))
except BlockingIOError:
pass
# Register next callback
selector.register(self.sock.fileno(),
EVENT_WRITE,
self.connected)
fetch
方法一开始会创建一个套接字连接,需要注意的是,这个方法会在连接建立前返回。它必须将控制权交给事件循环以等待连接。为了理解缘由,想象我们的整个应用架构像这样:
# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
fetcher.fetch()
while True:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)
所有的事件通知都在事件循环中调用 select
时被处理,因此 fetch
必须将控制权交给事件循环,以此让程序能知道套接字何时被建立,然后循环才能执行在 fetch
中注册的 connected
回调方法。
下面是 connected
方法的实现:
# Method on Fetcher class.
def connected(self, key, mask):
print('connected!')
selector.unregister(key.fd)
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
self.sock.send(request.encode('ascii'))
# Register the next callback.
selector.register(key.fd,
EVENT_READ,
self.read_response)
该方法发送了一个 GET 请求,一个真正的应用会检查 send
方法的返回值,以防止整个信息不能一次被发送完成。但我们的请求比较小,我们的应用也不成熟。它简单的调用了 send
然后等待响应。当然,它必须注册另外一个回调并将控制权让给事件循环,下一个,也是最后一个回调:read_response
会处理服务器的响应:
# Method on Fetcher class.
def read_response(self, key, mask):
global stopped
chunk = self.sock.recv(4096) # 4k chunk size
if chunk:
self.response += chunk
else:
selector.unregister(key.fd) # Done reading
links = self.parse_links()
# Python set-logic
for link in links.difference(seen_urls):
urls_todo.add(link)
Fetcher(link).fetch() # <- New Fetcher
seen_urls.update(links)
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
每次选择器发现套接字可读的时候,就会调用该回调方法,这可能着意味着两件事:套接字还有数据未被接收或是已经关闭。
该回调方法会从套接字连接中尝试获取 4 千字节的数据,如果可读的少于该数量,那么 chunk
包含了所有可读的数据,如果还有更多,chunk
则包含了 4 千字节的数据,且套接字也会保持可读。所以事件循环会在下次循环中再次调用该方法。当响应完成后,服务器会关闭套接字连接,而 chunk
则会为空。
未被展示的 parse_links
方法,返回一个包含 URL 的集合。我们会为每一个 URL 实例化一个新的 Fecther
,没有任何并发限制。注意基于回调的异步应用有一个很好的特征:在修改共享数据时不需要互斥,例如我们将连接添加到 seen_urls
。没有抢占式的多任务,因此我们也不能在代码中的任意点中断应用。
我们添加了一个全局变量 stopped
,并用它控制整个循环:
stopped = False
while not stopped:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)
一旦所有页面下载完成,fetcher
停止整个事件循环,则程序退出。
这个示例明显的显示了异步的问题:面条式代码。我们需要一些方式去表达一系列的计算和 I/O 操作,然后调度一系列的操作同时运行。但不用线程,这些系列操作不能被放到一个函数体中,每当函数开始一个 I/O 操作,它明确的保存了后来需要的状态并返回,而你有必要去思考并编写这些状态保存代码。
让我们解释下上面的意思,想象一下单个线程在传统的阻塞套接字中抓取 URL 是多么的简单:
# Blocking version.
def fetch(url):
sock = socket.socket()
sock.connect(('xkcd.com', 80))
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
# Page is now downloaded.
links = parse_links(response)
q.add(links)
在套接字操作之间,这个函数需要保存什么状态呢?包括套接字对象,URL,和累计的 response
,线程里的函数使用编程语言的基本功能去保存临时状态到局部变量中。该函数也具备“延续性”,即在 I/O 完成后的后续操作代码。运行时通过存储线程的指针来保持延续性。你无需思考在 I/O 操作之后如何恢复操作之前的变量,它由编程语言考虑。
但是以回调为基础的异步框架中,这些编程语言的特性毫无帮助。在等待 I/O 操作时,一个函数必须明确的指定相关状态,因为在 I/O 操作完成前,方法返回就会失去它对应的栈帧。在示例中,我们将 sock
和 response
作为属性保存在 self
实例中,以代替传统的变量存储。通过注册回调 connected
和 read_response
保持连续性,以替代传统的指示指针。随着应用功能的增多,需要保存以便在各个回调中使用的状态也会越来越复杂,如此繁复的工作容易使程序员偏头痛。
更糟的是,如果一个回调方法在下一个回调方法被调度之前抛出异常会怎么样?假如我们在 parse_links
里面做的很糟糕,而它在解析某些 HTML 时抛出了错误:
Traceback (most recent call last):
File "loop-with-callbacks.py", line 111, in <module>
loop()
File "loop-with-callbacks.py", line 106, in loop
callback(event_key, event_mask)
File "loop-with-callbacks.py", line 51, in read_response
links = self.parse_links()
File "loop-with-callbacks.py", line 67, in parse_links
raise Exception('parse error')
Exception: parse error
栈信息显示了事件循环正在运行回调方法,若我们不记得是什么导致这个错误,而这条链的两端都被破坏:我们不知道我们哪里来又要到哪里去。这种上下文的丢失被称为“堆栈撕裂”,很多情况下,它都会使用它的人感到困惑。堆栈撕裂同样也会阻止我们为一系列回调设置异常处理,即用 try/except
代码块包装方法的调用和对应的调用树6。
因此,即便排除多线程和异步在效率上的争论,还有一个关于哪一种方法更容易报错的争论:线程会因为同步中的失误导致数据竞赛的问题,而回调则会因为堆栈撕裂变得难以调试。
协程
有这样一个很好的方式,使得编写的异步代码既能够实现基于回调的效率,又能像多线程一样美观、易于维护。这种方式的实现被称为“协程”。使用 Python3.4 的 asynico 标准库,结合 aiohttp 模块,可以用协程写出相当直观的代码用于抓取 URL7:
@asyncio.coroutine
def fetch(self, url):
response = yield from self.session.get(url)
body = yield from response.read()
它同样具有扩展性,相比于操作系统严格限制下的占用 50K 内存的线程,在 Jesse 的系统上,一个 Python 协程仅仅只占用 3K 内存。Python 可以轻松的启动成百上千的协程。
对于协程的概念,追朔到早期的计算机科学,非常简单:一个可以暂停和恢复的子程序。尽管线程被操作系统优先用作多任务处理,而协程的多任务处理则是:由线程选择何时暂停,并决定哪一个协程在后续执行。
协程有很多实现的方式,就算在 Python 中也有好几种。Python3.4 标准库中的 asynico 则是基于生成器,Future 类,和 yield from
语句实现。从 Python3.5 开始,协程作为该语言的原生功能出现8。然而,理解协程在 Python3.4 中,使用已存在的语言功能的初次实现,是理解 Python3.5 原生协程的基础。
为了解释 Python3.4 中的基于生成器实现的协程,我们将会展示更多生成器的示例以及它是如何在协程中使用的,我们相信你会阅读的就像我们编写时一样愉快。一旦我们解释完基于生成器的协程,即将会在我们的爬虫中使用它。
Python 的生成器是如何工作的
在你理解 Python 生成器之前,必须理解常规的 Python 函数是如何工作的。通常,当一个 Python 函数调用一个子程序,这个子程序会保留控制权直到它返回或是抛出错误,然后控制权会回到调用者手上:
>>> def foo():
... bar()
...
>>> def bar():
... pass
标准的 Python 解释器是用 C 语言编写而成的,当一个 Python 函数被调用时,C 函数,也即 PyEval_EvalFrameEx
将会去执行它,它会在当前的上下文中获取 Python 的栈帧对象并执行 Python 的字节码,以下是 foo
的字节码:
>>> import dis
>>> dis.dis(foo)
2 0 LOAD_GLOBAL 0 (bar)
3 CALL_FUNCTION 0 (0 positional, 0 keyword pair)
6 POP_TOP
7 LOAD_CONST 0 (None)
10 RETURN_VALUE
foo
函数将 bar
读取到自己的栈中并调用,然后从栈中获取其返回值,得到 None
并将其推入栈中,最后将其返回。
当 PyEval_EvalFrameEx
遇到 CALL_FUNCTION
字节码,它会创建一个新的栈帧并递归:也就是说,它会递归调用 PyEval_EvalFrameEx
并创建新的栈帧用于执行 bar
。
理解 Python 的栈帧在堆内存中的分配非常重要!Python 解释器是一个普通的 C 程序,因此它的栈帧也符合其逻辑。但 Python 的栈帧是被堆所操控的。除此之外,这也意味着 Python 的栈帧可以在函数调用之外存活,为了表明这一行为,可以将当前帧保存到 bar
函数中:
>>> import inspect
>>> frame = None
>>> def foo():
... bar()
...
>>> def bar():
... global frame
... frame = inspect.currentframe()
...
>>> foo()
>>> # The frame was executing the code for 'bar'.
>>> frame.f_code.co_name
'bar'
>>> # Its back pointer refers to the frame for 'foo'.
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'
Figure 5.1 - Function Calls
现在是时候谈谈 Python 生成器了,它基于相同的架构——代码对象和栈帧——却达到了奇妙的效果。
这是一个生成器函数:
>>> def gen_fn():
... result = yield 1
... print('result of yield: {}'.format(result))
... result2 = yield 2
... print('result of 2nd yield: {}'.format(result2))
... return 'done'
...
当 Python 将 gen_fn
编译为字节码时,发现其中的 yield
语句并将其视为一个生成器函数,它会设置一个标志以同普通函数区分:
>>> # The generator flag is bit position 5.
>>> generator_bit = 1 << 5
>>> bool(gen_fn.__code__.co_flags & generator_bit)
True
当你调用一个生成器函数时,Python 会发现它的生成器标志,因此它实际上并不会执行这个方法,而是创建一个生成器:
>>> gen = gen_fn()
>>> type(gen)
<class 'generator'>
一个 Python 生成器封装了一个栈帧和函数体代码:
>>> gen.gi_code.co_name
'gen_fn'
而通过调用 gen_fn
产生的生成器均指向同一段代码,但每一个都有自己的栈帧,这些栈帧并没有存在任何 C 函数中,而是在堆内存中等待被使用:
Figure 5.2 - Generators
这个栈帧对象有一个“最后操作”指针,即最近执行的一个操作,在开始的时候,它的值为 -1,意味着生成器还没有开始:
>>> gen.gi_frame.f_lasti
-1
当我们调用 send
时,生成器会执行到第一个 yield
语句并暂停,而 send
方法的返回值为 1
,这是 gen
传递给 yield
表达式的值:
>>> gen.send(None)
1
现在生成器的指针指向字节码起始位置往后的第 3 个位置,而 Python 编译后的字节大小为 56:
>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56
这个生成器可以在任何时候被任意函数调用以恢复运行,因为它的栈帧并非在栈中而是在堆内存中。而它之前被调用的历史并没有清空(即指针仍停留在上次调用停留的位置),对它的调用也不需要像普通函数一样遵循先进后出的顺序。它是*的,像一朵浮空的云。
我们可以传递hello
给生成器,它会变成 yield
语句的结果,而生成器会继续执行直到生成 2
:
>>> gen.send('hello')
result of yield: hello
2
它的栈帧现在包含了局部变量:result
:
>>> gen.gi_frame.f_locals
{'result': 'hello'}
而其它通过调用 gen_fn
创建的生成器会拥有它们自己的栈帧和局部变量。
当我们再次调用 send
,生成器会从第二个 yield
语句处继续执行,并通过抛出 StopIteration
异常结束。
>>> gen.send('goodbye')
result of 2nd yield: goodbye
Traceback (most recent call last):
File "<input>", line 1, in <module>
StopIteration: done
这个异常包含一个值,也就是生成器的返回值:字符串 done
。
通过生成器构建协程
现在,我们知道生成器可以暂停,可以通过值恢复,同时它也有返回值。看起来很适合用于构建一个没有基于回调的面条式代码的异步应用程序。我们构建这样一个协程:可以在程序中与其它协程合作。我们构建的协程将会是 Python 标准库 asyncio 的简单版本,像 asyncio 一样,我们会使用生成器、Future 和 yield from
语句。
首先我们需要一个类型去表示协程在等待的对象,以下是简单版:
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
一个等待对象初始化时即为未完成状态,在调用 set_result
时即完成9。
我们来让 Fetcher
适配等待对象和协程,我们曾写过一个基于回调的 fetch
方法:
class Fetcher:
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('xkcd.com', 80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(),
EVENT_WRITE,
self.connected)
def connected(self, key, mask):
print('connected!')
# And so on....
fetch
方法以创建套接字开始,然后注册一个回调函数 connected
用于在套接字准备就绪时执行。现在我们可以将这两步合并到一个协程中:
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('xkcd.com', 80))
except BlockingIOError:
pass
f = Future()
# 译者注:此处的函数应加入接收参数,因为事件循环会在调用时传入两个参数
def on_connected():
f.set_result(None)
selector.register(sock.fileno(),
EVENT_WRITE,
on_connected)
yield f
selector.unregister(sock.fileno())
print('connected!')
现在,fetch
是一个生成器方法,不同于普通方法,因为它包含了 yield
语句。我们创建一个等待对象,然后通过返回它将 fetch
暂停知道套接字准备就绪,里面的 on_connected
函数则让等待对象完成等待。
但当等待对象完成时,用什么去恢复生成器呢?我们需要一个协程驱动,让我们称之为“任务”:
class Task:
def __init__(self, coro):
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)
def step(self, future):
try:
next_future = self.coro.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
Task(fetcher.fetch())
loop()
任务实例会通过发送 None
来启动生成器 fetch
,然后 fetch
会执行到它生成一个等待对象,任务实例则会接收这个等待对象并赋值给 next_future
,当套接字连接就绪时,事件循环会调用 on_connected
,继而为等待对象设置结果,同时也会调用 step
,进而恢复 fetch
的运行。
使用 yield from 重构协程
一旦套接字连接建立,我们将发送 HTTP GET 请求并读取服务器返回的响应数据,这些步骤不再需要分散到各个回调方法中,我们将它们统一放到一个生成器方法里:
def fetch(self):
# ... connection logic from above, then:
sock.send(request.encode('ascii'))
while True:
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(),
EVENT_READ,
on_readable)
chunk = yield f
selector.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
# Done reading.
break
这段代码会从套接字连接中获取整个响应数据,看起来非常的有用。我们该如何将它们重构为 fetch
的一个协程呢?现在该轮到 Python3 的 yield from
语句上场了,它可以让一个生成器代理另一个生成器。
为了知晓如何才能做到,让我们先回到先前简单的生成器示例中:
>>> def gen_fn():
... result = yield 1
... print('result of yield: {}'.format(result))
... result2 = yield 2
... print('result of 2nd yield: {}'.format(result2))
... return 'done'
...
从一个生成器中调用另一个生成器,使用 yield from
语句:
>>> # Generator function:
>>> def caller_fn():
... gen = gen_fn()
... rv = yield from gen
... print('return value of yield-from: {}'
... .format(rv))
...
>>> # Make a generator from the
>>> # generator function.
>>> caller = caller_fn()
生成器 caller
会扮演被代理的另一个生成器 gen
的角色:
>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti # Hasn't advanced.
15
>>> caller.send('goodbye')
result of 2nd yield: goodbye
return value of yield-from: done
Traceback (most recent call last):
File "<input>", line 1, in <module>
StopIteration
当 caller
代理 gen
,生成对应的内容时,caller
的指针并没有前进,注意即使在里面的 gen
生成器从一个 yield
移动到下一个时,它还是停留在 15,也就是 yield from
语句处的位置10。从 caller
的角度看,我们不知道它生成的值是来自它自身还是它代理的对象,而从 gen
的角度看,我们不知道给它传值的是 caller
还是其它元素。yield from
语句像是一个光滑的管道,通过它与gen
生成器交互直到最后返回。
一个协程可以通过 yield from
语句代理另一个协程,代替接收传入的值和生成对应的值。注意,caller
打印了 return value of yield-from: done
。当 gen
完成后,它的返回值变成了 caller
内的 yield from
语句的返回值:
rv = yield from gen
之前,当我们评价基于回调的异步程序时,主要的点在于“堆栈撕裂”:当回调函数抛出异常时,追踪栈信息显得有些无用。它仅仅只显示了事件循环正在执行回调函数,而没有其它的细节。那么协程是如何表现的呢?
>>> def gen_fn():
... raise Exception('my error')
>>> caller = caller_fn()
>>> caller.send(None)
Traceback (most recent call last):
File "<input>", line 1, in <module>
File "<input>", line 3, in caller_fn
File "<input>", line 2, in gen_fn
Exception: my error
这显得有用多了!在发生异常时,追踪栈表明了 caller_fn
是在代理 gen_fn
。更令人舒适的是,我们可以像处理普通子程序那样为子协程做一个异常处理的包装:
>>> def gen_fn():
... yield 1
... raise Exception('uh oh')
...
>>> def caller_fn():
... try:
... yield from gen_fn()
... except Exception as exc:
... print('caught {}'.format(exc))
...
>>> caller = caller_fn()
>>> caller.send(None)
1
>>> caller.send('hello')
caught uh oh
因此我们的子协程逻辑就与普通的子程序无异,让我们为 fetcher
编写一个更有用的子协程吧,先编写一个 read
方法用于接收数据块:
def read(sock):
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield f # Read one chunk.
selector.unregister(sock.fileno())
return chunk
基于 read
,我们可以建立一个 read_all
协程去接收整个响应:
def read_all(sock):
response = []
# Read whole response.
chunk = yield from read(sock)
while chunk:
response.append(chunk)
chunk = yield from read(sock)
return b''.join(response)
如果你忽略 yield from
语句,这个方法看起来就像传统的方法在处理阻塞 I/O 一样,但实际上,read
和 read_all
都是协程,从 read
产出时,read_all
会暂停,直到整个 I/O 操作完成。当 read_all
暂停时,asyncio 的事件循环可以做其它事情或等待其它的 I/O 事件,下次循环时,一旦事件准备就绪,read_all
会随着 read
的产出而恢复。
在栈的顶部,fetch
会调用 read_all
:
class Fetcher:
def fetch(self):
# ... connection logic from above, then:
sock.send(request.encode('ascii'))
self.response = yield from read_all(sock)
神奇的是,Task 类并不需要更改,它会像之前一样处理 fetch
协程:
Task(fetcher.fetch())
loop()
当 read
产出一个 future 时,task 会通过 yield from
的通道接收到它,恰似从 fetch
中产出的一样。当循环完成了一个 future 时,task 会把它的结果发送给 fetch
,而这个值被 read
所接收,好似 task 是直接在处理 read
一样:
Figure 5.3 - Yield From
为了完善我们实现的协程,我们再做一点打磨:当等待一个 future 时,我们用 yield
,当代理一个子协程时,用 yield from
。当一个协程暂停时,我们使用 yield from
会更加精简。而协程也无需关心它在等待什么。
我们从 Python 的生成器和迭代器中获得了巨大的好处。而对于调用者来说,生成器与迭代器并无二致。因此我们可以为 Future 类实现迭代器协议:
# Method on Future class.
def __iter__(self):
# Tell Task to resume me here.
yield self
return self.result
__iter__
方法是一个产出 future 实例自身的协程,现在,当我们将代码:
# f is a Future.
yield f
改为:
# f is a Future.
yield from f
得到的结果是一致的!处理协程的 task 对象通过调用 call
接收 future 对象,而当 future 完成时,它发送一个新的结果到协程中。
统一使用 yield from
有什么好处呢?为什么它会比在等待 future 时使用 yield
,代理子协程时使用 yield from
要好呢?它的优点在于:一个方法可以随意更改它的实现而对调用者没有任何影响:它可以是一个普通的方法返回会未完成的 future 对象,也可以是一个包含 yield from
语句,且带有返回值的协程。无论哪一种情况,调用者只需要使用 yield from
去获取结果。
亲爱的读者,现在我们到达了探索 asyncio 协程的愉快之旅的终点了。我们深入探究了生成器的机制,并大致实现了 future 和 task 对象,我们描绘了 asyncio 是如何同时拥有两个并发方案的优点的:目前的 I/O 操作比多线程更为高效,同时比基于回调函数的方案更清晰易读。当然,真正的 asyncio 要比我们所实现的结构复杂的多,它实现了零拷贝 I/O、公平调度、异常处理和其它一些列的功能。
对于 asyncio 的用户来说,编写协程要比你在这里看到的要简单的多了。在上面我们实现协程的基础上,你可以看到 callback,task 和 future,甚至可以看到调用 select
的非阻塞 I/O,但当使用 asyncio 构建应用时,这些都不会出现在你的代码中,就像我们承诺的那样,现在你可以如此轻易的抓取一个 URL:
@asyncio.coroutine
def fetch(self, url):
response = yield from self.session.get(url)
body = yield from response.read()
满意的结束这场探索,让我们回到最初的目标上来:使用 asyncio 开发一个异步的网络爬虫。
使用协程
开始的时候我们描述了这个爬虫是如何工作的,现在是时候使用 asyncio 的协程来实现它了。
我们的爬虫会抓取第一个页面,解析其中的链接,并添加到队列中。此后它开始展开进行并发抓取。但在服务器或客户端都没有限制并发量,我们希望同时最多只能有指定数量的并发。一旦其中一个完成页面的抓取,它应该立即去队列里获取下一个链接。我们会遇到没有那多链接可以抓取的时候,这时候一些子程序必须暂停。但当另一个子程序抓取到新的链接时,队列数量会立刻增加,而所有暂停的子程序都会马上恢复工作。我们的程序须在工作完成后退出。
如果把这些子程序想象为线程,我们该怎样实现爬虫的算法?也许会用到 Python 标准库的同步队列11,每次当一个项目进入到队列时,队列的 task
就会增加。工作中的线程会在处理完一个项目后调用 task_done
。而主流程则阻塞在 Queue.join
直到所有项目调用 task_done
后退出。
在 asyncio 中,协程也使用与线程相同的队列模式!首先让我们导入它12:
try:
from asyncio import JoinableQueue as Queue
except ImportError:
# In Python 3.5, asyncio.JoinableQueue is
# merged into Queue.
from asyncio import Queue
我们将工作者的公开状态集中到一个 Crawler 类中,并把主要逻辑写在它的 crawl
方法中,我们以协程的方式运行 crawl
,然后运行 asyncio 的事件循环直到 crawl
方法运行完成:
loop = asyncio.get_event_loop()
crawler = crawling.Crawler('http://xkcd.com',
max_redirect=10)
loop.run_until_complete(crawler.crawl())
Crawler 类初始化时需要一个根路径和 max_redirect
,即抓取一个 URL 时可接受的最大重定向数量。Crawler 会将 (URL, max_redirect)
作为一对放入到队列中(稍后再讲述这么做的原因)。
class Crawler:
def __init__(self, root_url, max_redirect):
self.max_tasks = 10
self.max_redirect = max_redirect
self.q = Queue()
self.seen_urls = set()
# aiohttp's ClientSession does connection pooling and
# HTTP keep-alives for us.
self.session = aiohttp.ClientSession(loop=loop)
# Put (URL, max_redirect) in the queue.
self.q.put((root_url, self.max_redirect))
现在对于未完成的任务队列只有一个了,回到主流程中,我们启动了事件循环和 crawl
方法:
loop.run_until_complete(crawler.crawl())
协程 crawl
就像一个主线程一样,会去协调工作者。当所有工作者在后台运行时,它会阻塞在 join
处直到所有任务完成:
@asyncio.coroutine
def crawl(self):
"""Run the crawler until all work is done."""
workers = [asyncio.Task(self.work())
for _ in range(self.max_tasks)]
# When all work is done, exit.
yield from self.q.join()
for w in workers:
w.cancel()
如果我们不想在线程中一次性启动所有的工作者,为了避免每次需要时创建线程的昂贵消耗,一般会使用线程池。但协程是廉价的,所以我们可以很简单的一次性启动所有工作者。
如何关闭爬虫是一件值得注意且有趣的事。当 join
的 future 完成时,那些携带任务的工作者仍然存活但只是被暂停了:它们在等待更多的 URL,但并没有新的进来。所以主流程会在退出前将它们都取消。否则当 Python 解释器退出并销毁所有对象时,这些工作者就会哭出来:
ERROR:asyncio:Task was destroyed but it is pending!
那么 cancel
是如何工作的呢?生成器还有一个我们未曾展示的功能,你可以从外部将异常抛进生成器内部:
>>> gen = gen_fn()
>>> gen.send(None) # Start the generator as usual.
1
>>> gen.throw(Exception('error'))
Traceback (most recent call last):
File "<input>", line 3, in <module>
File "<input>", line 2, in gen_fn
Exception: error
生成器被 throw
恢复,但此时却抛出了异常。如果调用栈中没有捕获异常的代码,那么异常则会继续抛出给栈顶。那么取消一个等待中的协程就可以这样做:
# Method of Task class.
def cancel(self):
self.coro.throw(CancelledError)
无论何时生成器被暂停,在一些 yield from
语句中,它会被恢复并抛出异常。我们在任务的 step
方法中处理这个异常:
# Method of Task class.
def step(self, future):
try:
next_future = self.coro.send(future.result)
except CancelledError:
self.cancelled = True
return
except StopIteration:
return
next_future.add_done_callback(self.step)
现在,task 知道它被取消了,所以当它被销毁时,它并不会对于自己的死呐喊。
一旦 crawl
方法取消了所有任务,它就会退出。事件循环发现这个协程已经完成(我们稍后展示它是如何发现的),它也就会退出:
loop.run_until_complete(crawler.crawl())
crawl
方法包含了所有的主流程。即协程去队列中获取 URL,抓取它,并从中解析新的链接。每一个协程都会独立的运行 work
方法:
@asyncio.coroutine
def work(self):
while True:
url, max_redirect = yield from self.q.get()
# Download page and add new links to self.q.
yield from self.fetch(url, max_redirect)
self.q.task_done()
Python 看见这块代码包含 yield from
语句,会将其编译为生成器函数。因此在 crawl
方法中,当主流程调用 self.work
10 次的时候,实际上它们并没有运行:而仅仅是创建了 10 个生成器对象和对应的引用。每一个都被封装在 Task 中,Task 会接受每一个生成器的产出值,并在 future 完成时,将结果传递给 send
并执行以驱使生成器到达下一步。因为生成器有它们自己的栈帧,且独立运行,有各自的局部便两个和指针。
工作者通过队列来调整自己的下一步行动。它等待 URL 的代码为:
url, max_redirect = yield from self.q.get()
队列的 get
方法也是一个协程,它会暂停知道有新的项目进入到队列,然后则恢复并返回新的项目。
顺便说一下,这里也是工作者在所有链接抓取完成后,主流程调用取消它们时它们暂停的地方。从协程的角度看,这也是 yield from
抛出 CancelledError
时,循环结束前的最后一程。
当一个工作者抓取到一个页面它回去解析它并将解析到的新链接放入队列中,然后调用 task_done
以减少计数器。少数情况下,一个工作者抓取到页面并发现所有的链接都被抓取过,而队列里也没有新的任务。因此工作者会调用 task_done
以减少计数器直至为 0,然后,等待队列 join
的 crawl
方法就会恢复并结束。
之前我们说到为何队列中的项目会是像下面这样一对对的:
# URL to fetch, and the number of redirects left.
('http://xkcd.com/353', 10)
新的 URL 最多可以有 10 次重定向,抓取这种特定的 URL 会被指向一个新的以斜杠结尾的地址。我们减少其可用的重定向次数,并将新的地址放入到队列中:
# URL with a trailing slash. Nine redirects left.
('http://xkcd.com/353/', 9)
aiohttp
模块默认会追踪重定向地址并将最终的结果返回给我们,因此需要更改此默认值并在爬虫中手动管理重定向地址,这样就可以合并重定向到同一地址的 URL:如果该地址已经被抓取过,则会存在于 self.seen
,而我们已经从另一入口去抓取这个链接了:
Figure 5.4 - Redirects
爬虫抓取到“foo”并看到它重定向到“baz”,因此它将“baz”添加到队列和 seen_urls
中,如果下一次需要抓取的页面是同样重定向到“baz”的“bar”,爬虫则不会再将“baz”放入到队列中。如果服务器的响应是一个页面而不是重定向地址,fetch
将会解析其中的链接并将新的链接放入到队列中。
@asyncio.coroutine
def fetch(self, url, max_redirect):
# Handle redirects ourselves.
response = yield from self.session.get(
url, allow_redirects=False)
try:
if is_redirect(response):
if max_redirect > 0:
next_url = response.headers['location']
if next_url in self.seen_urls:
# We have been down this path before.
return
# Remember we have seen this URL.
self.seen_urls.add(next_url)
# Follow the redirect. One less redirect remains.
self.q.put_nowait((next_url, max_redirect - 1))
else:
links = yield from self.parse_links(response)
# Python set-logic:
for link in links.difference(self.seen_urls):
self.q.put_nowait((link, self.max_redirect))
self.seen_urls.update(links)
finally:
# Return connection to pool.
yield from response.release()
如果这是多线程代码,可能会遇到烦人的竞赛条件。比如,一个线程检查某个链接是否存在于 seen_urls
,如果不存在则将其放入到队列和 seen_urls
中,如果线程在这两个操作之间被打断,另一个线程可能从不同的页面解析到相同的链接,同样认定它没有存在于 seen_urls
,也将其添加到队列中。现在链接两次被放入到队列中,导致(最好是)了重复的工作和错误的统计。
然后,协程仅会在 yield from
语句处显得容易被打断。这正是协程代码远少于多线程代码遭受竞赛危害的关键所在:多线程代码必须通过获取锁明确的进入到一个临界区,否则它就是可中断的。而一个 Python 协程一般是不间断的,只有在生成器产出时才会转移控制权。
我们不再需要像基于回调的程序中那样的 Fetcher 类了,这个类是局限于回调的低效解决办法:它需要一些地方以便在等待 I/O 时存储相关的状态,因为它们的局部变量并不会在各个调用间保留。但是 fetch
协程可以像一个普通方法一样存储它的状态到局部变量,因此不再需要一个类来处理了。
当 fetch
结束处理服务器的响应后,它会返回给调用者:work
。而 work
方法则会调用 task_done
并获取队列中的下一个需要被抓取的 URL。
当 fetch
将新的链接放入到队列时,它增加了未完成的任务数量,并因此保持主程序运行,也就是因等待 q.join
而暂停的协程。然而,如果还有新链接而当前的页面地址是队列中的最后一个 URL,那么当 work
调用 task_done
,那么未完成的任务数量将为降为 0,这会导致 join
从暂停中恢复并终止整个主流程。
用以协调工作者和主协程的代码如下13:
class Queue:
def __init__(self):
self._join_future = Future()
self._unfinished_tasks = 0
# ... other initialization ...
def put_nowait(self, item):
self._unfinished_tasks += 1
# ... store the item ...
def task_done(self):
self._unfinished_tasks -= 1
if self._unfinished_tasks == 0:
self._join_future.set_result(None)
@asyncio.coroutine
def join(self):
if self._unfinished_tasks > 0:
yield from self._join_future
主协程 crawl
从 join
方法产出,那么当最后一个工作者将未完成的任务数减为 0 时,它会将控制权让给 crawl
并终止。
整个流程即将结束,我们的程序以调用 crawl
开始:
loop.run_until_complete(self.crawler.crawl())
而程序是如何终止的呢?既然 crawl
是一个生成器方法,调用它会得到一个生成器。为了驱动这个生成器产出,asyncio 将它包装在一个任务中:
class EventLoop:
def run_until_complete(self, coro):
"""Run until the coroutine is done."""
task = Task(coro)
task.add_done_callback(stop_callback)
try:
self.run_forever()
except StopError:
pass
class StopError(BaseException):
"""Raised to stop the event loop."""
def stop_callback(future):
raise StopError
当任务完成时,会抛出 StopError
异常,事件循环用它作为正常结束的标志。
但 task 的 add_done_callback
和 result
方法是什么?你也许会以为 task 与 future 类似。你的直觉是对的,我们必须承认这里隐藏了一个关于 Task 类的细节:task 就是 future:
class Task(Future):
"""A coroutine wrapped in a Future."""
通常 future 通过被调用它的 set_result
完成,但任务会在它的协程结束时自我完成。回忆一下,之前我们探究 Python 生成器时,当一个生成器返回时,它会抛出一个特有的 StopIteration
异常:
# Method of class Task.
def step(self, future):
try:
next_future = self.coro.send(future.result)
except CancelledError:
self.cancelled = True
return
except StopIteration as exc:
# Task resolves itself with coro's return
# value.
self.set_result(exc.value)
return
next_future.add_done_callback(self.step)
因此当事件循环调用 task.add_done_callback(stop_callback)
,它就准备好被 task 终止了。再次展示下 run_until_complete
:
# Method of event loop.
def run_until_complete(self, coro):
task = Task(coro)
task.add_done_callback(stop_callback)
try:
self.run_forever()
except StopError:
pass
当 task 捕获到 StopIteration
异常并完成自身时,回调函数抛出了 StopError
异常并向上传递给事件循环,事件循环停止,调用栈也会离开 run_until_complete
,我们的程序到此就结束了。
总结
现在,越来越多的程序会关联 I/O 而不是 CPU 操作。对于这两种程序来说,Python 的多线程都是最糟糕的:全局解释器锁导致了运算执行并不会真正的并行,而优先权的切换也会导致它们易于收到竞赛的影响。异步通常是正确的方式,但基于回调的异步会随着代码的增加而变得混乱不堪。协程是一种不错的替代方式,它们自然的使用子程序,且具有正常的异常处理和追踪栈信息。
如果我们忽略掉 yield from
语句,协程就像是传统的线程那样处理阻塞 I/O,我们甚至可以传统的方式在多进程中调度协程,并不需要做过多的转换。因此相比于基于回调的异步,协程是一种诱人的语法,让程序员们拥有多线程那样的编写体验。
但当我们仔细的观察 yield from
语句时,可以看到这是协程让出控制权给其他程序的标记。不同于线程,协程展示了我们的程序在哪里可以中断,哪里不可以。Glyph Lefkowitz 在它著名的文章“Unyielding”14中写道:“线程让局部逻辑变得困难,而局部逻辑也许是软件开发中最为困难的事情。”简明的产出,却可以让“审查和理解协程的行为(也即正确性)而不是整个系统”变为可能。
这篇文章写在 Python 和异步兴起期间,在你刚刚学习到的基于生成器的协程设计已经在 2014 年 3 月发布的 asyncio 模块中实现了。2015 年 9 月,Python 3.5 在发布时已经于语言层面实现了协程。这种原生的语言层面的声明使用了新的语法“async def”,并且使用了新的关键字“await”代替“yield from”代理协程或等待一个 Future。
尽管有这些改进,但核心的概念并无变化。Python 原生的协程实现将与生成器不在类似,但工作仍是非常相似的;实际上,它们在 Python 解释器中也是同一实现。Task,Future 和事件循环也会继续在 asyncio 中扮演着它们自己的角色。
现在你已经知道 asyncio 是如何工作的,你完全可以忘掉实现的细节,所有的实现都存在于一个个简洁的接口中,但掌握了核心理念可以让你在如今的异步环境中编写出正确而高效的代码。
注释
1. Guido 在在 PyCon 2013 上介绍了 asyncio,之后称之为“Tulip”。
2. 如果接受者接受消息比较缓慢或系统缓存区已满时,即便调用 send
也会导致阻塞。
3. http://www.kegel.com/c10k.html
4. Python 的全局解释器锁禁止一个进程并发执行代码,并行 CPU 运算在 Python 中需要使用多进程,或使用 C 代码编写并行逻辑,但这是另一个主题了。
5. Jesse 在“What Is Async, How Does It Work, And When Should I Use It?”中列出了适合与不宜使用异步的情形;Mike Bayer 在“Asynchronous Python and Databases”中对比了 asyncio 和多线程的表现。
6. 对于该问题复杂的解决方案,参见 http://www.tornadoweb.org/en/stable/stack_context.html。
7. @asyncio.coroutine
装饰器并非充满魔力。事实上,如果环境变量 PYTHONASYNCIODEBUG
没有被设置,那么用它装饰生成器函数几乎没用。它仅仅是为函数设置了一个 _is_coroutine
属性,以便于框架的其余部分识别,使用 asyncio 时完全可能只使用生成器而不带 @asyncio.coroutine
装饰器。
8. PEP 492 中描述了Python 3.5 内建的协程:“使用 async 和 await 语法”。
9. 这个功能有许多缺陷,比如:一旦 future 完成,对应的协程应当理解恢复而不是暂停,但在我们的代码中它并没有恢复,翻阅 asyncio 的 Future 类以查看更为复杂的实现。
10. 实际上,这就是 yield from
在 CPython 中的工作方式,一个函数在执行它的每个语句时,会增加指针指数。但外部的生成器执行 yield from
时,它会将指针指数减 1 来维持自身指向 yield from
语句所在的位置,然后再产出值传递给调用者。这样重复直到内部的生成器抛出 StopIteration
错误,这会让外部的生成器指针指向下一步操作。
11. https://docs.python.org/3/library/queue.html
12. https://docs.python.org/3/library/asyncio-sync.htm
13. 实际上 asyncio.Queue
的实现使用了 asyncio.Event
而不是这里的 Future。两者的取别在于 Event 可以被重置,而 Future 不能从完成状态再回到未完成状态。