Python 使用 asyncio 包处理并 发(使用asyncio包编写服务器)
使用asyncio包编写服务器
演示 TCP 服务器时通常使用回显服务器。我们要构建更好玩一点的示
例服务器,用于查找 Unicode 字符,分别使用简单的 TCP 协议和 HTTP
协议实现。这两个服务器的作用是,让客户端使用 4.8 节讨论过的
unicodedata 模块,通过规范名称查找 Unicode 字符。图 18-2 展示了
在一个 Telnet 会话中访问 TCP 版字符查找服务器所做的两次查询,一次
查询国际象棋棋子字符,一次查询名称中包含“sun”的字符。
图 18-2:在一个 Telnet 会话中访问 tcp_charfinder.py 服务器——查
询“chess black”和“sun”
使用asyncio包编写TCP服务器
下面几个示例的大多数逻辑在 charfinder.py 模块中,这个模块没有任何
并发。你可以在命令行中使用 charfinder.py 脚本查找字符,不过这个脚
本更为重要的作用是为使用 asyncio 包编写的服务器提供支持。
charfinder.py 脚本的代码在本书的代码仓库中
(https://github.com/fluentpython/example-code)。
charfinder 模块读取 Python 内建的 Unicode 数据库,为每个字符名称
中的每个单词建立索引,然后倒排索引,存进一个字典。例如,在倒排
索引中,‘SUN’ 键对应的条目是一个集合(set),里面是名称中包含
‘SUN’ 这个词的 10 个 Unicode 字符。 倒排索引保存在本地一个名为
charfinder_index.pickle 的文件中。如果查询多个单词,charfinder 会
计算从索引中所得集合的交集。
下面我们把注意力集中在响应图 18-2 中那两个查询的 tcp_charfinder.py
脚本上。我要对这个脚本中的代码做大量说明,因此把它分为两部分,
分别在示例 18-14 和示例 18-15 中列出。
示例 18-14 tcp_charfinder.py:使用 asyncio.start_server 函数
实现的简易 TCP 服务器;这个模块余下的代码在示例 18-15 中
import sys
import asyncio
from charfinder import UnicodeNameIndex ➊
CRLF = b'\r\n'
PROMPT = b'?> '
index = UnicodeNameIndex() ➋
@asyncio.coroutine
def handle_queries(reader, writer): ➌while True: ➍writer.write(PROMPT) # 不能使用yield from! ➎yield from writer.drain() # 必须使用yield from! ➏data = yield from reader.readline() ➐try:query = data.decode().strip()except UnicodeDecodeError: ➑query = '\x00'client = writer.get_extra_info('peername') ➒print('Received from {}: {!r}'.format(client, query)) ➓if query:if ord(query[:1]) < 32: ⓫breaklines = list(index.find_description_strs(query)) ⓬if lines:writer.writelines(line.encode() + CRLF for line in lines) ⓭writer.write(index.status(query, len(lines)).encode() + CRLF) ⓮yield from writer.drain() ⓯print('Sent {} results'.format(len(lines))) ⓰print('Close the client socket') ⓱
writer.close()
❶ UnicodeNameIndex 类用于构建名称索引,提供查询方法。
❷ 实例化 UnicodeNameIndex 类时,它会使用 charfinder_index.pickle
文件(如果有的话),或者构建这个文件,因此第一次运行时可能要等
几秒钟服务器才能启动。
❸ 这个协程要传给 asyncio.start_server 函数,接收的两个参数是
asyncio.StreamReader 对象和 asyncio.StreamWriter 对象。
❹ 这个循环处理会话,直到从客户端收到控制字符后退出。
❺ StreamWriter.write 方法不是协程,只是普通的函数;这一行代
码发送 ?> 提示符。
❻ StreamWriter.drain 方法刷新 writer 缓冲;因为它是协程,所以
必须使用 yield from 调用。
❼ StreamReader.readline 方法是协程,返回一个 bytes 对象。
❽ Telnet 客户端发送控制字符时,可能会抛出 UnicodeDecodeError
异常;遇到这种情况时,为了简单起见,假装发送的是空字符。
❾ 返回与套接字连接的远程地址。
❿ 在服务器的控制台中记录查询。
⓫ 如果收到控制字符或者空字符,退出循环。
⓬ 返回一个生成器,产出包含 Unicode 码位、真正的字符和字符名称的
字符串(例如, U+0039\t9\tDIGIT NINE);为了简单起见,我从中
构建了一个列表。
⓭ 使用默认的 UTF-8 编码把 lines 转换成 bytes 对象,并在每一行末
尾添加回车符和换行符;注意,参数是一个生成器表达式。
⓮ 输出状态,例如 627 matches for ‘digit’。
⓯ 刷新输出缓冲。
⓰ 在服务器的控制台中记录响应。
⓱ 在服务器的控制台中记录会话结束。
⓲ 关闭 StreamWriter 流。
handle_queries 协程的名称是复数,因为它启动交互式会话后能处理
各个客户端发来的多次请求。
注意,示例 18-14 中所有的 I/O 操作都使用 bytes 格式。因此,我们要
解码从网络中收到的字符串,还要编码发出的字符串。Python 3 默认使
用的编码是 UTF-8,这里就隐式使用了这个编码。
注意一点,有些 I/O 方法是协程,必须由 yield from 驱动,而另一些则是普通的函数。例如,StreamWriter.write 是普通的函数,我们假
定它大多数时候都不会阻塞,因为它把数据写入缓冲;而刷新缓冲并真
正执行 I/O 操作的 StreamWriter.drain 是协
程,StreamReader.readline 也是协程。写作本书时,asyncio 包的
API 文档有重大的改进,明确标识出了哪些方法是协程。
示例 18-15 接续示例 18-14,列出这个模块的 main 函数。
示例 18-15 tcp_charfinder.py(接续示例 18-14):main 函数创建
并销毁事件循环和套接字服务器
def main(address='127.0.0.1', port=2323): ➊port = int(port)loop = asyncio.get_event_loop()server_coro = asyncio.start_server(handle_queries, address, port,
loop=loop) ➋server = loop.run_until_complete(server_coro) ➌host = server.sockets[0].getsockname() ➍print('Serving on {}. Hit CTRL-C to stop.'.format(host)) ➎try:loop.run_forever() ➏except KeyboardInterrupt: # 按CTRL-C键passprint('Server shutting down.')server.close() ➐loop.run_until_complete(server.wait_closed()) ➑loop.close() ➒
if __name__ == '__main__':main(*sys.argv[1:]) ➓
❶ 调用 main 函数时可以不传入参数。
❷ asyncio.start_server 协程运行结束后,返回的协程对象返回一
个 asyncio.Server 实例,即一个 TCP 套接字服务器。
❸ 驱动 server_coro 协程,启动服务器(server)。
❹ 获取这个服务器的第一个套接字的地址和端口,然后……
❺ ……在服务器的控制台中显示出来。这是这个脚本在服务器的控制
台中显示的第一个输出。
❻ 运行事件循环;main 函数在这里阻塞,直到在服务器的控制台中按
CTRL-C 键才会关闭。
❼ 关闭服务器。
❽ server.wait_closed() 方法返回一个期物;调用
loop.run_until_complete 方法,运行期物。
❾ 终止事件循环。
❿ 这是处理可选的命令行参数的简便方式:展开 sys.argv[1:],传给
main 函数,未指定的参数使用相应的默认值。
注意,run_until_complete 方法的参数是一个协程(start_server
方法返回的结果)或一个 Future 对象(server.wait_closed 方法返
回的结果)。如果传给 run_until_complete 方法的参数是协程,会
把协程包装在 Task 对象中。
仔细查看 tcp_charfinder.py 脚本在服务器控制台中生成的输出(如示例
18-16),更易于理解脚本中控制权的流动。
示例 18-16 tcp_charfinder.py:这是图 18-2 所示会话在服务器端的
输出
$ python3 tcp_charfinder.py
Serving on ('127.0.0.1', 2323). Hit CTRL-C to stop. ➊
Received from ('127.0.0.1', 62910): 'chess black' ➋
Sent 6 results
Received from ('127.0.0.1', 62910): 'sun' ➌
Sent 10 results
Received from ('127.0.0.1', 62910): '\x00' ➍
Close the client socket ➎
❶ 这是 main 函数的输出。
❷ handle_queries 协程中那个 while 循环第一次迭代的输出。
❸ 那个 while 循环第二次迭代的输出。
❹ 用户按下 CTRL-C 键;服务器收到控制字符,关闭会话。
❺ 客户端套接字关闭了,但是服务器仍在运行,准备为其他客户端提
供服务。
注意,main 函数几乎会立即显示 Serving on… 消息,然后在调用
loop.run_forever() 方法时阻塞。在那一点,控制权流动到事件循环
中,而且一直待在那里,不过偶尔会回到 handle_queries 协程,这个
协程需要等待网络发送或接收数据时,控制权又交还事件循环。在事件
循环运行期间,只要有新客户端连接服务器就会启动一个
handle_queries 协程实例。因此,这个简单的服务器可以并发处理多
个客户端。出现 KeyboardInterrupt 异常,或者操作系统把进程杀
死,服务器会关闭。
tcp_charfinder.py 脚本利用 asyncio 包提供的高层流
API(https://docs.python.org/3/library/asyncio-stream.html),有现成的服
务器可用,所以我们只需实现一个处理程序(普通的回调或协程)。此
外,asyncio 包受 Twisted 框架中抽象的传送和协议启发,还提供了低
层传送和协议 API。详情请参见 asyncio 包的文档
(https://docs.python.org/3/library/asyncio-protocol.html),里面有一个使
用低层 API 实现的 TCP 回显服务器。
使用aiohttp包编写Web服务器
asyncio 版国旗下载示例使用的 aiohttp 库也支持服务器端 HTTP,我
就使用这个库实现了 http_charfinder.py 脚本。图 18-3 是这个简易服务器
的 Web 界面,显示搜索“cat face”表情符号得到的结果。
图 18-3:浏览器窗口中显示在 http_charfinder.py 服务器中搜索“cat
face”得到的结果
有些浏览器显示 Unicode 字符的效果比其他浏览器好。图
18-3 中的截图在 OS X 版 Firefox 浏览器中截取,我在 Safari 中也得
到了相同的结果。但是,运行在同一台设备中的最新版 Chrome 和
Opera 却不能显示猫脸等表情符号。不过其他搜索结果(例
如“chess”)正常,因此这可能是 OS X 版 Chrome 和 Opera 的字体
问题。
我们先分析 http_charfinder.py 脚本中最重要的后半部分:启动和关闭事
件循环与 HTTP 服务器。参见示例 18-17。
示例 18-17 http_charfinder.py:main 和 init 函数
@asyncio.coroutine
def init(loop, address, port): ➊app = web.Application(loop=loop) ➋app.router.add_route('GET', '/', home) ➌handler = app.make_handler() ➍server = yield from loop.create_server(handler,
address, port) ➎return server.sockets[0].getsockname() ➏
def main(address="127.0.0.1", port=8888):port = int(port)loop = asyncio.get_event_loop()host = loop.run_until_complete(init(loop, address, port)) ➐print('Serving on {}. Hit CTRL-C to stop.'.format(host))try:loop.run_forever() ➑except KeyboardInterrupt: # 按CTRL-C键passprint('Server shutting down.')loop.close() ➒
if __name__ == '__main__':main(*sys.argv[1:])
❶ init 协程产出一个服务器,交给事件循环驱动。
❷ aiohttp.web.Application 类表示 Web 应用……
❸ ……通过路由把 URL 模式映射到处理函数上;这里,把 GET / 路由
映射到 home 函数上(参见示例 18-18)。
❹ app.make_handler 方法返回一个 aiohttp.web.RequestHandler
实例,根据 app 对象设置的路由处理 HTTP 请求。
❺ create_server 方法创建服务器,以 handler 为协议处理程序,并
把服务器绑定在指定的地址(address)和端口(port)上。
❻ 返回第一个服务器套接字的地址和端口。
❼ 运行 init 函数,启动服务器,获取服务器的地址和端口。
❽ 运行事件循环;控制权在事件循环手上时,main 函数会在这里阻
塞。
❾ 关闭事件循环。
我们已经熟悉了 asyncio 包的 API,现在可以对比一下示例 18-17 与前面的 TCP 示例(见示例 18-15),看它们创建服务器的方式有何不同。
在前面的 TCP 示例中,服务器通过 main 函数中的下面两行代码创建并
排定运行时间:
server_coro = asyncio.start_server(handle_queries, address, port,
loop=loop)
server = loop.run_until_complete(server_coro)
在这个 HTTP 示例中,init 函数通过下述方式创建服务器:
server = yield from loop.create_server(handler,
address, port)
但是 init 是协程,驱动它运行的是 main 函数中的这一行:
host = loop.run_until_complete(init(loop, address, port))
asyncio.start_server 函数和 loop.create_server 方法都是协
程,返回的结果都是 asyncio.Server 对象。为了启动服务器并返回服
务器的引用,这两个协程都要由他人驱动,完成运行。在 TCP 示例
中,做法是调用 loop.run_until_complete(server_coro),其中
server_coro 是 asyncio.start_server 函数返回的结果。在 HTTP
示例中,create_server 方法在 init 协程中的一个 yield from 表达
式里调用,而 init 协程则由 main 函数中的
loop.run_until_complete(init(…)) 调用驱动。
我提到这一点是为了强调之前讨论过的一个基本事实:只有驱动协程,
协程才能做事,而驱动 asyncio.coroutine 装饰的协程有两种方法,
要么使用 yield from,要么传给 asyncio 包中某个参数为协程或期物
的函数,例如 run_until_complete。
示例 18-18 列出 home 函数。根据这个 HTTP 服务器的配置,home 函数
用于处理 /(根)URL。
示例 18-18 http_charfinder.py:home 函数
def home(request): ➊query = request.GET.get('query', '').strip() ➋print('Query: {!r}'.format(query)) ➌if query: ➍descriptions = list(index.find_descriptions(query))res = '\n'.join(ROW_TPL.format(**vars(descr))for descr in descriptions)msg = index.status(query, len(descriptions))else:descriptions = []res = ''msg = 'Enter words describing characters.'html = template.format(query=query, result=res, ➎
message=msg)print('Sending {} results'.format(len(descriptions))) ➏return web.Response(content_type=CONTENT_TYPE, text=html) ➐
❶ 一个路由处理函数,参数是一个 aiohttp.web.Request 实例。
❷ 获取查询字符串,去掉首尾的空白。
❸ 在服务器的控制台中记录查询。
❹ 如果有查询字符串,从索引(index)中找到结果,使用 HTML 表格
中的行渲染结果,把结果赋值给 res 变量,再把状态消息赋值给 msg
变量。
❺ 渲染 HTML 页面。
❻ 在服务器的控制台中记录响应。
❼ 构建 Response 对象,将其返回。
注意,home 不是协程,既然定义体中没有 yield from 表达式,也没
必要是协程。在 aiohttp 包的文档中,add_route 方法的条目
(http://aiohttp.readthedocs.org/en/v0.14.4/web_reference.html#aiohttp.web.UrlDispatcher.下面说道,“如果处理程序是普通的函数,在内部会将其转换成协程”。
示例 18-18 中的 home 函数虽然简单,却有一个缺点。home 是普通的函
数,而不是协程,这一事实预示着一个更大的问题:我们需要重新思考
如何实现 Web 应用,以获得高并发。下面来分析这个问题。
更好地支持并发的智能客户端
示例 18-18 中的 home 函数很像是 Django 或 Flask 中的视图函数,实现
方式完全没有考虑异步:获取请求,从数据库中读取数据,然后构建响
应,渲染完整的 HTML 页面。在这个示例中,存储在内存中的
UnicodeNameIndex 对象是“数据库”。但是,对真正的数据库来说,应
该异步访问,否则在等待数据库查询结果的过程中,事件循环会阻塞。
例如,aiopg 包(https://aiopg.readthedocs.org/en/stable/)提供了一个异
步 PostgreSQL 驱动,与 asyncio 包兼容;这个包支持使用 yield
from 发送查询和获取结果,因此视图函数的表现与真正的协程一样。
除了防止阻塞调用之外,高并发的系统还必须把复杂的工作分成多步,
以保持敏捷。http_charfinder.py 服务器表明了这一点:如果搜索“cjk”,
得到的结果是 75 821 个中文、日文和韩文象形文字。 此时,home 函
数会返回一个 5.3MB 的 HTML 文档,显示一个有 75 821 行的表格。
我在自己的设备中使用命令行 HTTP 客户端 curl 访问架设在本地的
http_charfinder.py 服务器,查询“cjk”,2 秒钟后获得响应。浏览器要布
局包含这么大一个表格的页面,用的时间会更长。当然,大多数查询返
回的响应要小得多:查询“braille”返回 256 行结果,页面大小为 19KB,
在我的设备中用时 0.017 秒。可是,如果服务器要用 2 秒钟处理“cjk”查
询,那么其他所有客户端都至少要等 2 秒——这是不可接受的。
避免响应时间太长的方法是实现分页:首次至多返回(比如说)200
行,用户点击链接或滚动页面时再获取更多结果。如果查看本书代码仓
库(https://github.com/fluentpython/example-code)中的 charfinder.py 模
块,你会发现 UnicodeNameIndex.find_descriptions 方法有两个可
选的参数——start 和 stop,这是偏移值,用于支持分页。因此,我
们可以返回前 200 个结果,当用户想查看更多结果时,再使用 AJAX 或
WebSockets 发送下一批结果。
实现分批发送结果所需的大多数代码都在浏览器这一端,因此 Google
和所有大型互联网公司都大量依赖客户端代码构建服务:智能的异步客
户端能更好地使用服务器资源。
虽然智能的客户端甚至对老式 Django 应用也有帮助,但是要想真正为
这种客户端服务,我们需要全方位支持异步编程的框架,从处理 HTTP
请求和响应到访问数据库,全都支持异步。如果想实现实时服务,例如
游戏和以 WebSockets 支持的媒体流,那就尤其应该这么做。
这里留一个练习给读者:改进 http_charfinder.py 脚本,添加下载进度
条。此外还有一个附加题:实现 Twitter 那样的“无限滚动”。做完这个
练习后,我们对如何使用 asyncio 包做异步编程的讨论就结束了。