python之asyncio协程和异步编程
文章目录
- 初识coroutines 协程
- 前置知识
- 迭代器(iterator)
- 可迭代对象(iterable)
- 异步迭代器(asynchronous iterator)
- 生成器(generator)
- 生成器对象的迭代推进方式
- return 和 yield
- yield expression
- 列表推导式和生成器表达式
- 异步生成器(generator)
- 上下文管理器(context manager)
- 异步上下文管理器
- contextlib 上下文管理包
- `contextmanager`和`asynccontextmanager`装饰器+生成器简化上下文管理器的定义.
- ExitStack 和 AsyncExitStack 同步退出栈和异步退出栈
- awaitables可等待对象之coroutines,Tasks和Futures
- asyncio 中的其他顶层方法
- asyncio.timeout
- asyncio.timeout_at
- asyncio.wait_for 和 asyncio.wait
- asyncio.as_completed
- asyncio.to_thread 以及 asyncio.run_coroutine_threadsafe
- asyncio.current_task 和 asyncio.all_tasks
- asyncio network io(streams api)
- asyncio synchronization
- asyncio.Lock
- asyncio.Event
- asyncio.Condition
- asyncio.Semaphore
- asyncio.BoundedSemaphore
- asyncio.Barrier
- asyncio 中的 queue
- asyncio 中的 subprocess
- asyncio 中的 event loop 以及 low-level api
- 创建以及获取event loop
- event loop 其他方法以及使用场景
- run_in_executor
- 总结
最近大模型应用中的 mcp非常火热,笔者去研究了一下python mcp相关编程实现,其中涉及到很多
async/await
异步操作,官方的sample里面也使用了非常多的coroutines协程相关特性,于是本文在这里详细总结一下python的协程以及asyncio异步编程相关知识点,提高mcp编程效率.
初识coroutines 协程
首先需要回答这个经典的问题:什么是协程?
- 广义上协程:协程(coroutine)是一种
用户态
的轻量级线程,是由程序(而非操作系统内核内核)调度的执行单元,旨在通过协作式并发提高程序效率,尤其适合I/O 密集型
任务. - python中的协程: 在 Python 中,协程(coroutine)是一种基于用户态调度的轻量级并发执行单元,通过 async/await 语法实现,核心是在单线程内通过协作式切换实现多任务并发,尤其适合 I/O 密集型场景.协程采用同步代码风格简化之前基于回调的异步实现.协程不是线程的替代,是对并发模型的补充.
如下代码实现的协程
#!/usr/bin/env python
# -*- coding:utf-8 -*-import asyncioasync def test():await asyncio.sleep(1)return 'ok'async def main():print('coroutine main run')# 协程函数不执行,仅返回协程对象coro = test()# <class 'coroutine'>print(type(coro))# main让出执行权,test协程函数执行,返回结果res = await coroprint(res)if __name__ == '__main__':print('started')# 同步代码块中协程运行入口,阻塞获取协程main()完全运行完毕的结果asyncio.run(main())print('finished')
运行结果:
started
coroutine main run
<class 'coroutine'>
ok
finished
这样一个最简单的基于协程的异步程序就完成了.
前置知识
在继续协程探索之路之前,先把一些前置的跟协程会有关系的python语法回顾一下
迭代器(iterator)
官方迭代器术语: https://docs.python.org/3/glossary.html#term-iterator
迭代器是数据流的一种展现,它需要实现两个方法__iter__
和__next__
其中__iter__
方法是返回迭代器本身,其次它还需要实现__next__
方法返回迭代器中的下一个元素
,如果迭代器已经没有更多的元素了,那么抛出 StopIteration
异常.迭代器对象在for ... in ...
语句中使用比较多,依次便利迭代器中的元素.
两个内置函数和迭代器操作有关,一个是iter(obj)
,如果传递的参数是一个迭代器对象
相当于调用该对象obj.__iter__()
方法,相当于返回的是迭代器本身
.
另一个内置函数next(obj)
相当于调用obj.__next__()
for in
语句本质上也是遍历迭代器
,内部逻辑是先调用iter(obj)
得到iterator
,然后再不断调用next(iterator)
直到抛出StopIteration
异常,只是for in
会处理掉这个异常不会将其往上抛出.
下面是一个简单的迭代器的实现:
#!/usr/bin/env python
# -*- coding:utf-8 -*-class MyIterator:def __init__(self, size):self.size = sizeself.data = [i for i in range(size)]self.index = 0def __iter__(self):return selfdef __next__(self):if self.index < self.size:res = self.data[self.index]self.index += 1return reselse:raise StopIterationif __name__ == '__main__':iterator = MyIterator(2)# iterator.__iter__() 返回本身,这里是同一个对像print(iterator == iter(iterator))print(next(iterator))print(next(iterator))try:# 没有元素再执行next抛出StopIterationprint(next(iterator))except StopIteration as e:print('exception happend')
以及输出:
True
0
1
exception happend
可迭代对象(iterable)
可迭代对象官方术语:http://docs.python.org/3/glossary.html#term-iterable
可迭代对象定义是实现了__iter__
方法或者__getitem__
方法且实现序列语义
的类型都可称作可迭代对象.内置类型里面有非常多的可迭代对象,比如序列类型:list,str,tuple以及非序列类型的dict,file对象等,都是可迭代对象.按照这个定义迭代器也必然是可迭代对象.
上面定义中的__getitem__(self,key)
方法,官方的说明: https://docs.python.org/3/reference/datamodel.html#object.__getitem__
其实就是说对象可以使用obj[key]
这种语法根据索引访问元素,序列类型以及字典类型都支持这种语法,他们内部都实现了__getitem__
方法.
其中有两处细节说明:
- 将
非iterator
的可迭代对象传递给内置函数iter
将返回一个迭代器对象
,且每一次调用iter
方法返回的都是一个全新的迭代器
.如下面的代码所示:
#!/usr/bin/env python
# -*- coding:utf-8 -*-class MyIterator:def __init__(self, size):self.size = sizeself.data = [i for i in range(size)]self.index = 0def __iter__(self):return selfdef __next__(self):if self.index < self.size:res = self.data[self.index]self.index += 1return reselse:raise StopIterationdef iterable_test():iter1 = MyIterator(1)# 迭代器传递给iter返回的一定是迭代器本身,无论调用多少次iter# Trueprint(iter1 == iter(iter1))# Trueprint(iter1 == iter(iter1))# 迭代器外的可迭代对象比如list,str等,每一次调用iter返回都是一个新的迭代器对象l = [1,2,3]# Fakseprint(iter(l) == iter(l))if __name__ == '__main__':iterable_test()
运行结果:
True
True
False
- 将可迭代对象传递给iter方法
并不一定是调用对象内的__iter__方法
,如果一个类型只是实现了__getitem__
方法并没有实现__iter__
方法也可以作为一个可迭代对象,比如下面的例子:
class MySeq:def __init__(self):passdef __getitem__(self, key):if key > 2:# 注意这里如果不是StopIteration for 循环会抛出异常raise StopIterationelse:return keydef test_seq():seq = MySeq()# 同理这里返回是不同的迭代器对象 Falseprint(iter(seq) == iter(seq))for i in seq:print(i)it = iter(seq)# 类型 iteratorprint(type(it))print(next(it))print(next(it))print(next(it))try:print(next(it))except Exception as e:print('exception happend')if __name__ == '__main__':test_seq()
输出结果:
False
0
1
2
<class 'iterator'>
0
1
2
exception happend
所以iter(obj)
并不一定是调用的obj.__iter__
方法,也有可能是创建基于__getitem__
方法的迭代器并且返回.不过绝大多数可迭代对象都是实现了__iter__
方法的,内置的iter
函数首先就是调用__iter__
方法,这个也更满足迭代协议的设计意图.这个问题其实不必深究,只要知道非迭代器类型的可迭代对象,iter方法每一次调用一定是返回一个新的可迭代对象.
异步迭代器(asynchronous iterator)
官方术语:https://docs.python.org/3/glossary.html#term-asynchronous-iterator
这个就是上面同步的迭代器的版本,即异步迭代器需要实现__aiter__
的普通方法返回迭代器本身,以及__anext__
方法返回一个可等待对象(await object)
,这里基本上__anext__
方法就是一个协程
.同样异步迭代器可以通过await obj.__anext__
获取迭代器中元素直到抛出StopAsyncIteration
异常.既然是异步迭代器,自然对应的循环语句是async for
,而且涉及到await obj.__anext__
所以它只能在协程函数
中使用;同时针对异步迭代器也有对应的aiter
和anext
内置函数,举个例子:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
class AsyncMyIter:def __init__(self, size):self.size = sizeself.data = [i for i in range(size)]self.index = 0def __aiter__(self):return selfasync def __anext__(self):if self.index >= self.size:raise StopAsyncIterationelse:res = self.data[self.index]self.index += 1return resasync def test_async_iter():it = AsyncMyIter(2)print(it == aiter(it))await anext(it)await anext(it)try:await anext(it)except StopAsyncIteration as e:print('exception happend')it = AsyncMyIter(2)# aiter获取异步迭代器,然后调用await next(async iterator) 直到抛出StopAsyncIteration 异常并且内部处理掉async for item in it:print(item)if __name__ == '__main__':asyncio.run(test_async_iter())
输出结果:
True
exception happend
0
1
至于异步可迭代对象
参考普通的可迭代对象变成对应异步版本即可.此处不再赘述.
生成器(generator)
官方术语:https://docs.python.org/3/glossary.html#term-generator
什么是生成器:生成器是一个函数,内部存在yield表达式
,调用此函数产生一个生成器迭代器对象
,此对象可以在for循环中循环输出值或者通过next(obj)
内置函数一个一个输出值直到抛出StopInteration
异常.直接调用生成器不会运行,只会产生一个生成器迭代器对象
.这个特性有一点像协程
什么是生成器迭代器:直接调用生成器返回的值即是生成器迭代器对象.因为它也是一个迭代器对象,所以它也实现了__next__
和__iter__
方法.
迭代器生成器每一次推动的时候会运行到下一个yield
,返回yield后的值同时暂时挂起生成器执行,直到下一次推动行为出发,继续从上一次yield暂停的地方运行到下一个yield,如果后面不能运行到yield处
,那么就抛出StopIteration异常.
简单样例:
#!/usr/bin/env python
# -*- coding:utf-8 -*-def test_generator():print('runs')yield 1def test():# 调用生成器不会运行g = test_generator()# 类型generatorprint(type(g))# 存在__iter__方法和__next__方法print(dir(g))# 本身是一个迭代器,iter方法对象就是本身 Trueprint(g == iter(g))# next推进函数运行到下一个yieldprint(next(g))# 没有yield 抛出 StopIteration异常try:print(next(g))except StopIteration as e:print('exception')if __name__ == '__main__':test()
输出结果
<class 'generator'>
['__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__next__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'gi_code', 'gi_frame', 'gi_running', 'gi_suspended', 'gi_yieldfrom', 'send', 'throw']
True
runs
1
exception
生成器对象的迭代推进方式
前面演示的生成器迭代器对象的推进方式都是next函数,其实生成器迭代器对象本身还有一个send
函数也可以推进迭代过程.同时send还有一个高级功能实现外部值发回生成器,让生成器的输出值根据这个外部输入变化,这样就可以实现外部代码和生成器的双向通信
当生成器迭代器从未被迭代推进那么send只能返送None
, 如果被推进至少一次(next或者send(None)后则可以正常send(val)
推进生成器运行.
举个例子
def test_send():def inner_generator():x = yield 1print(f'x is : {x}')x = yield 2print(f'x is {x}')x = yield xprint(f'x is {x}')gen = inner_generator()# 返回1print(gen.send(None))# 打印 x is 2 返回2print(gen.send(2))# 因为用的next 打印x is None 返回 Noneprint(next(gen))try:# 打印x is 10 抛出异常print(gen.send(10))except StopIteration as e:print('exception happend')if __name__ == '__main__':test_send()
输出:
1
x is : 2
2
x is None
None
x is 10
exception happend
send
可以影响除第一次迭代外每一次的输出.send起work需要yield前面有变量来存放send赋值
return 和 yield
return在一般函数里面是返回值,而在生成器里面它是终止迭代,返回生成器终止值,抛出StopIteration异常.return之后的yield无效,即便再次推动迭代过程.
举个例子:
def test_return():def inner_generator():yield 1yield 2return 3yield 4gen = inner_generator()print(next(gen))print(next(gen))try:# 运行到return 抛出异常,终止迭代print(next(gen))except StopIteration as e:print('exception happend')try:# 多次调用也是抛出异常,终止迭代print(next(gen))except StopIteration as e:print('exception again')# for 循环也是终止迭代只输出前两个yield元素for i in inner_generator():print(i)if __name__ == '__main__':test_return()
但是return一定是终止迭代不能再推进了么,并不是,我们知道在try...except...finally
中即便try内有return语句表示函数返回值,但是finally 语句依然会执行,这个过程加上yield,行为和和常规函数不同,如下面一个例子:
def test_return_2():def inner_generator():try:yield 1yield 2return 1yield 10finally:yield 3gen = inner_generator()print(next(gen))print(gen.send(10))# 运行到return 抛出异常 进入到 finally 输出3 而不是输出10print(next(gen))# 注意这里迭代能输出3print('迭代输出')for i in inner_generator():print(i)if __name__ == '__main__':test_return_2()
输出结果:
1
2
3
迭代输出
1
2
3
可见return抛出StopIteration后,如果生成器内有finally块内有yield,依然会在finally块中继续迭代直到finally中也不再有yield从而抛出StopIteration异常
.
再看下一个例子:
def test_return_3():def inner_generator():try:yield 1yield 2return 1finally:x = yield 3print(f'x is {x}')x = yield 4print(f'x is {x}')gen = inner_generator()print(gen.send(None))print(gen.send(None))print(gen.send(10))print(gen.send(11))try:gen.send(12)except StopIteration as e:print('exception happend')if __name__ == '__main__':test_return_3()
输出:
1
2
3
x is 11
4
x is 12
exception happend
可见即便在finally代码快内,send依然有效,可以传递值给生成器.
yield expression
高级用法,生成器的定义里面说到需要有yield表达式
,而yield表达式不仅仅是yield value
这么简单,官方给yield表达式的定义为:https://docs.python.org/3/reference/simple_stmts.html#yield. 就是yield value
和 yield from <iterable>
(委托生成器). 注意这里的可迭代对象也可以是生成器迭代器对象,也可以是普通的序列类型对象.
简单案例:
def test_yield_from_1():def generate():yield 1yield 2yield from range(3, 10)for i in generate():print(i)if __name__ == '__main__':test_yield_from_1()
输出结果:
1
2
3
4
5
6
7
8
9
这里 yield from 后面接一个list, 每一次迭代则是返回list中一个元素,直到所有元素都返回完毕.
yield from
还有一个高级用法就是x = yield from <iterable>
把后面iterable
的终止值(return的值)
赋值给变量x
注意这里的x
是iterable
终止值,不是send(val)
赋值的对象,这里一定要区分开,send(val)
只会影响x = yield xxx
中的x
不会影响x = yield from <iterable>
中的x
.举个复杂例子:
def test_yield_from_2():def inner():try:x = yield 1print(f'inner first x is {x}')x = yield 2print(f'inner second x is {x}')return 10finally:x = yield 3print(f'inner third x is {x}')# 如果屏蔽这个 return 20 那么yield from 当前生成器的终止值则是try模块中return 的 10return 20def generate():x = yield 0print(f'outer first x is {x}')x = yield from inner()print(f'outer second x is {x}')gen = generate()# 返回值是0print(gen.send(None))# 返回值是1 outer first x is 10print(gen.send(10))# 返回值是2 inner first x is 11print(gen.send(11))# 返回值是3 inner second x is 12print(gen.send(12))# inner third x is 12, outer second x is 20 出异常 这里内层生成器终止值是20try:print(gen.send(13))except StopIteration as e:print('exception happend')print('循环遍历结果')for i in generate():print(i)if __name__ == '__main__':test_yield_from_2()
输出结果:
0
outer first x is 10
1
inner first x is 11
2
inner second x is 12
3
inner third x is 13
outer second x is 20
exception happend
循环遍历结果
0
outer first x is None
1
inner first x is None
2
inner second x is None
3
inner third x is None
outer second x is 20
总之x=yield from <iterable>
这里的x
不会受到send
的影响,它只是后面可迭代对象的终止值,如果后面可迭代对象没有终止值(比如可迭代对象是一个list)那么它的取值就是None.一般有终止值的场景都是可迭代对象是生成器对象的场景.
列表推导式和生成器表达式
列表推导式语法: [表达式 for 元素 in 可迭代对象 if 条件]
生成器表达式语法:(表达式 for 元素 in 可迭代对象 if 条件)
两者在语法上看起来是[]和()的区别,但是实际上两者区别比较大,主要区别如下:
- 列表表达式返回的类型是一个
list
,生成器表达式返回的是一个生成器迭代器对象(generator) - 列表表达式是一次性生成所有元素到内存,而生成器表达式只在生成器对象迭代推进的时候才会一个个产生数据
- 列表表达式返回结果本身就是一个列表可以无限次遍历,而生成器表达式结果是一个生成器对象,只能遍历一次
- 列表表达式可以通过obj[indx]这种方式访问元素,而生成器表达式返回结果不支持
简单的例子:
def list_generator():l = [i for i in range(5)]gen = (i for i in range(5))print('类型')print(type(l))print(type(gen))print('方法')print(dir(l))print(dir(gen))print('索引访问')print(l[0])try:print(gen(0))except Exception as e:print(f'生成器表达式结果不支持索引访问')print('类表推导式结果多次迭代')for item in l:print(item)for item in l:print(item)print('生成器表达式结果多次迭代')for item in gen:print(item)for item in gen:print(item)if __name__ == '__main__':list_generator()
运行结果:
类型
<class 'list'>
<class 'generator'>
方法
['__add__', '__class__', '__class_getitem__', '__contains__', '__delattr__', '__delitem__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__getstate__', '__gt__', '__hash__', '__iadd__', '__imul__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__reversed__', '__rmul__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', 'append', 'clear', 'copy', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 'reverse', 'sort']
['__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__next__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'gi_code', 'gi_frame', 'gi_running', 'gi_suspended', 'gi_yieldfrom', 'send', 'throw']
索引访问
0
生成器表达式结果不支持索引访问
类表推导式结果多次迭代
0
1
2
3
4
0
1
2
3
4
生成器表达式结果多次迭代
0
1
2
3
4
生成器表达式由于其具有的这种惰性
,所以它适合处理大数据流,比如大文件流的场景,占用内存资源会远小于列表推导式.
异步生成器(generator)
官方术语:https://docs.python.org/3/glossary.html#term-asynchronous-generator
其实就是generator的协程版本,yield 要放置于协程中,相应的同步函数变成异步版本比如await anext()
和 await obj.asent
异常变成StopAsyncIteration
.其中yield from
委托生成器语法不能在异步生成器中使用
.
async def async_generator():async def inner():x = yield 1print(f'inner first x is {x}')x = yield 2print(f'inner second x is {x}')async def generate():async for item in inner():yield itemgen = generate()# <class 'async_generator'>print(type(gen))print(await anext(gen))# 这里asend value无效print(await gen.asend(10))try:print(await anext(gen))except StopAsyncIteration as e:print('exception happend')if __name__ == '__main__':asyncio.run(async_generator())
输出结果:
1
inner first x is None
2
inner second x is None
exception happend
上下文管理器(context manager)
with 和上下文管理器,with的官方术语文档:https://docs.python.org/3.11/reference/compound_stmts.html#with
上下文管理器与with contextmanage as target
语句结合使用,上下文呢管理器需要实现两个方法,第一:__enter__
方法,返回对象赋值给with语句中的target
变量;第二个方法:__exit__
在with语句块代码运行完毕后运行此方法,执行上下文相关资源的释放,或者异常处理.
上下文管理器的特点:
__exit__(self,et,ev,tb)
三个参数分别是exception type, exception 以及 traceback.如果with代码块中有异常那么这三个参数一定有值
,如果没有异常三个参数为None.一场情况下如果__exit__
返回True
异常不会往上抛出
如果不返回True比如返回False,或者没有return那么异常会向上抛出,需要在with代码快外加上try exception 捕获异常;如果是没有异常发生,返回任何值或者不返回都不会有任何影响.- 如果异常发生在
__enter__
函数内,则__exit__
函数不会执行,必须在with块捕获异常. - 如果with as target赋值的时候发生异常,比如
__enter__
只返回一个值而as target子句分配变量数量不对,比如这样with xx as (a,b,c)
,那么这个异常算with代码块内异常,__exit__
函数会运行,且入参为异常相关参数.
简单的例子,with 块中的异常不向上抛出:
class MyContext:def __init__(self):passdef __enter__(self):return 'ok'def __exit__(self, et, ev, tb):# 标准处理,根据参数et是否为空判断是否with块发生异常if et:print(f'exception: {et} happended')print(ev)print(tb)# 异常不抛出return Trueelse:print('no exception')def test_context_1():with MyContext() as t:print(t)1/0print('finished')if __name__ == '__main__':test_context_1()
输出如下,异常不会抛出.
exception: <class 'ZeroDivisionError'> happended
division by zero
<traceback object at 0x758a52c5d440>
finished
__enter__
异常,__exit__
不会运行,举例如下:
class MyContext:def __init__(self):passdef __enter__(self):# __enter__ 直接抛出异常raise ValueError('value error')def __exit__(self, et, ev, tb):# 标准处理,根据参数et是否为空判断是否with块发生异常if et:print(f'exception: {et} happended')print(ev)print(tb)else:print('no exception')def test_context_2():try:with MyContext() as t:print(t)except Exception as e:print(str(e))print('finished')if __name__ == '__main__':test_context_2()
输出,结果看不会运行__exit__
函数.
value error
finished
as target子句异常算作with 块异常__exit__
会执行.
class MyContext:def __init__(self):passdef __enter__(self):return 'ok'def __exit__(self, et, ev, tb):# 标准处理,根据参数et是否为空判断是否with块发生异常if et:print(f'exception: {et} happended')print(ev)print(tb)else:print('no exception')def test_context_3():try:with MyContext() as (t,a,b):print(t)except Exception as e:print(str(e))print('finished')if __name__ == '__main__':test_context_3()
输出结果可以验证as
子句分配变量数量不匹配发生的异常算作with块
内异常,最终会执行__exit__
函数.
exception: <class 'ValueError'> happended
not enough values to unpack (expected 3, got 2)
<traceback object at 0x7e53be47c280>
not enough values to unpack (expected 3, got 2)
finished
异步上下文管理器
异步上下文管理器官方术语:https://docs.python.org/3.11/reference/datamodel.html#asynchronous-context-managers
其实说白了就是用__aenter__
和__aexit__
两个协程
替换同步版本的__enter__
和__exit__
函数.异步上下文管理器自然是和async with
语句一起使用.举个例子:
class AsyncMyContext:def __init__(self):passasync def __aenter__(self):await asyncio.sleep(1)return 'ok'async def __aexit__(self, et, ev, tb):if et:print(f'exception {et} happend')print(et)print(ev)print(tb)else:print('no exception')async def test_async_context():try:async with AsyncMyContext() as t:print(t)1/0except Exception as e:print(str(e))print('finished')if __name__ == '__main__':asyncio.run(test_async_context())
输出结果
exception <class 'ZeroDivisionError'> happend
<class 'ZeroDivisionError'>
division by zero
<traceback object at 0x7f90a9193f80>
division by zero
finished
contextlib 上下文管理包
python提供了强大的contextlib来帮助开发人员实现上下文管理器相关的操作.
contexlib官方doc: https://docs.python.org/3.11/library/contextlib.html
总结一下此包的常用功能
contextmanager
和asynccontextmanager
装饰器+生成器简化上下文管理器的定义.
举个例子,同步的上下文管理器实现如下,基本上都是这样一个代码模板,处理好exception和with代码块运行后的资源清理
def ctx_test_1():@contextmanagerdef inner_ctx():# yield之前的语句相当于标准上下文管理器类的__enter__print('open resource')resource = 'resource'try:# yield object 用于 with 的 as target子句将object赋值给targetyield resourceexcept Exception as e:# 相当于标准上下文管理器的__exit__处理exception的逻辑# 这里可以选择不抛出异常,也可以选择抛出,最佳做法是抛出异常print('exception: ' + str(e))raisefinally:# 资源清理,无论with块中是否发生异常都会运行的代码# 所以放置在finally代码块print('resource release')# with 块也要加异常捕获 因为生成器中异常基本都会往上面抛try:with inner_ctx() as g:print('with block ctx info: ' + g)1/0except Exception as e:print('block exception: ' + str(e))print('all finished')if __name__ == '__main__':ctx_test_1()
输出结果
open resource
with block ctx info: resource
exception: division by zero
resource release
block exception: division by zero
all finished
异步版本和同步版本大同小异,只是将普通函数换成协程,代码如下:
async def async_ctx_test_1():@asynccontextmanagerasync def inner_ctx():# yield之前的语句相当于标准上下文管理器类的__enter__print('open resource')resource = 'resource'try:# yield object 用于 with 的 as target子句 将resource赋值给 targetyield resourceexcept Exception as e:# 相当于标准上下文管理器的__exit__处理exception的逻辑# 这里可以选择不抛出异常,也可以选择抛出,最佳做法是抛出异常print('exception: ' + str(e))raisefinally:# 资源清理,无论with块中是否发生异常都会运行的代码# 所以放置在finally代码块print('resource release')# with 块也要加异常捕获 因为生成器中异常基本都会往上面抛try:async with inner_ctx() as g:print('with block ctx info: ' + g)1/0except Exception as e:print('block exception: ' + str(e))print('all finished')if __name__ == '__main__':asyncio.run(async_ctx_test_1())
输出结果:
open resource-local
with block ctx info: resource
exception: division by zero
resource release
block exception: division by zero
all finished
ExitStack 和 AsyncExitStack 同步退出栈和异步退出栈
首先ExitStack本身也是一个上下文管理器,再编程上实现一个上下文管理器简化多个上下文管理器和相应的清理函数组合的场景.举个例子,当前有两个文件,从第一个文件获取的是第二个文件的元数据(比如文件路径),然后根据这个文件路径打开第二个文件读取内容.不用existstack可能是这样的写法
def normal_handler():base_path = Path(__file__).parentwith open(base_path.joinpath('test_file_1.txt'), 'r') as f1:file_path = f1.read().strip()with open(base_path.joinpath(f'{file_path}'), 'r') as f2:content = f2.read().strip()print(content)if __name__ == '__main__':normal_handler()
如果使用existstack则代码如下:
def use_stack():base_path = Path(__file__).parent# ExitStack.__enter__ 返回的是对象本身,和绝大多数上下文管理器__enter__行为一直with ExitStack() as stack:# enter_context 接收一个contextmanager, 调用cm.__enter__,同时把cm.__exit__方法添加到exitstack内部的callback stack中f1 = stack.enter_context(open(base_path.joinpath('test_file_1.txt'), 'r'))file_path = f1.read().strip()f2 = stack.enter_context(open(base_path.joinpath(f'{file_path}'), 'r'))content = f2.read().strip()print(content)if __name__ == '__main__':use_stack()
和不使用ExitStack是同样的输出结果,使用stack就减少了with语句的使用,ExitStack内部有一个callback stack, 当调用enter_context
方法的时候每一个上下文管理器的__exit__
方法都会都会被注册到这个内置callback stack上,当ExitStack 上下文退出的时候,callback stack上注册的这些__exit__
函数就会以注册时候相反的顺序被调用
(堆栈特性,LIFO),这个行为就和多个嵌套with语句退出with代码块时的__exit__
函数调度逻辑一致.举个例子:
def use_stack_2():def inner_test():print('inner test runs')@contextmanagerdef inner(name: str):try:yield namefinally:print(f'{name} exit')with ExitStack() as stack:res = stack.enter_context(inner('a'))print(f'context manager: {res} registered')res = stack.enter_context(inner('b'))print(f'context manager: {res} registered')# 内置的callback stack 上还可以注册任意的callbackstack.callback(inner_test)# exitstack 退出时,callback stack 上执行函数顺序 inner_test, cm b __exit__, cm a __exit__if __name__ == '__main__':use_stack_2()
输出:
context manager: a registered
context manager: b registered
inner test runs
b exit
a exit
exception处理问题:正常的异常处理顺序也是栈顶的__exit__
到栈底的__exit__
顺序,与上下文管理器注册相反的顺序,如果中间某一个__exit__
将exception处理了不往上抛除,那么整个exitstack代码块执行完毕后也不会抛出异常,举个例子,正常的异常在每一个上下文管理器中都应该抛出
def use_stack_3():@contextmanagerdef inner(n: str):try:yield nexcept:print(f'{n} exception')raisefinally:print(f'{n} exit')try:with ExitStack() as stack:stack.enter_context(inner('a'))stack.enter_context(inner('b'))1/0except Exception as e:print(str(e))if __name__ == '__main__':use_stack_3()
输出:
b exception
b exit
a exception
a exit
division by zero
如果上层的__exit__
处理掉异常不抛出:
def use_stack_4():@contextmanagerdef inner(n: str):try:yield nexcept:print(f'{n} exception happend')finally:print(f'{n} exit')with ExitStack() as stack:stack.enter_context(inner('a'))stack.enter_context(inner('b'))1/0if __name__ == '__main__':use_stack_4()
异常在栈顶的__exit__
就处理了,其前面的__exit__
都不会捕获到异常,ExitStack代码块也是正常退出,不会有异常
输出结果:
b exception happend
b exit
a exit
AsyncExitStack则是ExitStack异步版本,只不过里面部分方法由普通方法变成了协程,举个例子:
async def use_stack_5():async def inner_callback():print('inner callback runs')@asynccontextmanagerasync def inner(n: str):try:yield nexcept:print(f'{n} exception happend')raisefinally:print(f'{n} exit')async with AsyncExitStack() as stack:await stack.enter_async_context(inner('a'))await stack.enter_async_context(inner('b'))if __name__ == '__main__':asyncio.run(use_stack_5())
输出结果:
b exit
a exit
至此,异步编程的基础回顾完毕.
awaitables可等待对象之coroutines,Tasks和Futures
awaitables可等待对象(https://docs.python.org/3.11/library/asyncio-task.html#task-object)指的是await
关键字后可接的对象,一共是三种:coroutines, Tasks,和Futures.其中coroutine前面已经讲过,这里主要是讲Tasks和Futures.创建Task有两种方法,
第一种asyncio.create_task
(https://docs.python.org/3.11/library/asyncio-task.html#asyncio.Task),
传递一个协程对象,构建一个Task对象,举个例子
async def test_task():async def inner():return 'ok'task = asyncio.create_task(inner())res = await taskprint(res)print(task.result())if __name__ == '__main__':asyncio.run(test_task())
task的重要方法:
如下列表罗列了task的状态相关的重要方法:
method | description |
---|---|
done | task是否完成,task抛出异常,或者正常运行完毕,或者是被取消都表示task已经完成,此函数都返回True,其它情况返回False |
cancel | 取消task,需要注意调用此函数只是发送一个取消task的请求,并代表task会被立即取消,这个时候调用cancelled依然可能返回False |
cancelled | task是否被取消,返回True代表task被取消.只有task被取消(显示调用cancel方法),此函数才返回True,其它都是False |
result | 获取task运行结果,如果task没有运行完毕抛出InvalidStateError异常,如果被取消抛出CancelledError异常,如果运行过程中代码有异常,那么抛出代码运行时异常 |
exception | task运行过程中的异常,如果task没有运行完毕抛出InvalidStateError异常,如果被取消抛出CancelledError异常,如果运行完毕代码无异常返回None,有异常返回此代码运行时异常 |
从上面表格上可以看出只有done
和cancelled
之类的状态方法不会抛出异常,而result
和exception
等方法如果状态不对就可能抛出异常,所以当我们正常获取一个task运行结果的时候需要有比较严谨的状态判断逻辑,举个例子
async def task_func():async def inner():await asyncio.sleep(1)return 'ok'task = asyncio.create_task(inner())await asyncio.sleep(2)if task.done():if not task.cancelled():if task.exception():print(task.exception())else:print(task.result())if __name__ == '__main__':asyncio.run(task_func())
从这里可见我们要编写稳健的代码获取一个task的运行结果或者是异常需要多层if...else...
asyncio.TaskGroup 一组task的上下文管理器,当需要多个task放置于event-loop运行的时候,可以使用TaskGroup更加方便,当TaskGroup上下文退出的时候,每一个task都会执行await
等待task运行完毕.
async def task_group_1():async def inner_1():return 'hello'async def inner_2():print('aaa')async with asyncio.TaskGroup() as g:t1 = g.create_task(inner_1())t2 = g.create_task(inner_2())print(t1.result())print(t2.result())if __name__ == '__main__':asyncio.run(task_group_1())
输出结果:
aaa
hello
None
TaskGroup异常处理细节:
async def task_group_2():async def inner_1():await asyncio.sleep(3)return 1async def inner_2():await asyncio.sleep(1)1/0try:async with asyncio.TaskGroup() as g:t1 = g.create_task(inner_1())t2 = g.create_task(inner_2())except Exception as e:print(e)# 所有task都是doneprint(f't1 done: {t1.done()}')print(f't2 done: {t2.done()}')# t1 本身还运行完毕,它是被cancelled 所以cancelled是Trueprint(f't1 cancelled: {t1.cancelled()}')# t2 是导致异常的任务 cancelled是Falseprint(f't2 cancelled: {t2.cancelled()}')# 正常获取结果或者获取exception的代码如下if t1.cancelled():print(f't1 is cancelled')else:if t1.exception():print(f't1 exception: {t1.exception()}')else:print(f't1 result: {t1.result()}')if t2.cancelled():print(f't2 is cancelled')else:if t2.exception():print(f't2 exception: {t2.exception()}')else:print(f't2 result: {t2.result()}')if __name__ == '__main__':asyncio.run(task_group_2())
运行结果:
t1 done: True
t2 done: True
t1 cancelled: True
t2 cancelled: False
t1 is cancelled
t2 exception: division by zero
首先异常会向上抛出,所以如果coroutine中没有捕获异常就需要在async with
块之外捕获异常,其次异常发生时候,所有的task的状态都是done
.因此只需要判断任务是否是因为被cancel导致的done,如果不是证明任务要么正常运行完毕,要么是代码运行时抛异常,总之如果要写出稳健代码还是需要多层if...else...
最佳替代方案asyncio.gather
可以传递coroutine或者Future(Tasj)对象. gather可以实现,并发运行传递的可等待对象,直到运行结束或者抛出异常且返回结果中为每一个可等待对象运行结果或者异常.gather和taskgroup最本质区别是,gather一定能保证可正常运行的coroutine或者task运行完毕,不会因为其中一个task异常而取消其他task
,例子如下:
async def test_gather():async def inner_1():await asyncio.sleep(1)1/0async def inner_2():await asyncio.sleep(2)return 2async def inner_3():await asyncio.sleep(3)return 3# return_exceptions 一定为True 不然gather这一行代码依旧遇到异常就抛出res = await asyncio.gather(inner_1(), inner_2(), inner_3(), return_exceptions=True)print(res)if __name__ == '__main__':asyncio.run(test_gather())
运行结果:
[ZeroDivisionError('division by zero'), 2, 3]
可见结果里面即包含第一个协程的异常,有包含其他能正常运行协程的结果.
future:https://docs.python.org/3.11/library/asyncio-future.html#asyncio.Future
future本身是low-level
的可等待对象.多半情况下我们使用的是Task
对象,因为Task
本身继承自Future
,如下面源码所示.
class Task(Future[_T_co]): # type: ignore[type-var] # pyright: ignore[reportInvalidTypeArguments]if sys.version_info >= (3, 12):def __init__()......
样例代码:
async def test_future():async def inner():return 'ok't1 = asyncio.create_task(inner())# task类型是 asyncio.Taskprint(type(t1) == asyncio.Task)# future对象只能通过low-level的loop对象创建loop = asyncio.get_running_loop()fut = loop.create_future()# future类型是 asyncio.Futureprint(type(fut) == asyncio.Future)async def set_fut(fut, delay, value):await asyncio.sleep(delay)fut.set_result(value)loop.create_task(set_fut(fut, 2, 1))res = await futprint(res)if __name__ == '__main__':asyncio.run(test_future())
运行结果:
True
True
1
实际开发场景是asyncio.Task
使用更多.
asyncio 中的其他顶层方法
asyncio.timeout
返回一个异步上下文管理器,控制代码块内等待时长,超过等待时长限制触发TimeoutError.传递参数为整数等待时间以秒为单位,样例代码:
async def test_timeout_1():loop = asyncio.get_running_loop()print(loop.time())async def inner():await asyncio.sleep(2)return 1try:async with asyncio.timeout(2):await inner()await inner()except asyncio.TimeoutError as e:print(f'exception: {str(e)}')if __name__ == '__main__':asyncio.run(test_timeout_1())
这里第二个协程运行时就超时会抛出异常.这里可以在context manager上重新reschedule超时时间,代码如下:
async def test_timeout_2():loop = asyncio.get_running_loop()print(loop.time())async def inner():await asyncio.sleep(2)return 1try:async with asyncio.timeout(3) as cm:await inner()# reschedule 超时时间 当前+3s 正好可以运行完毕下面的协程cm.reschedule(loop.time() + 3)await inner()except asyncio.TimeoutError as e:print(f'exception: {str(e)}')print('finished')if __name__ == '__main__':asyncio.run(test_timeout_2())
asyncio.timeout_at
类似于timeout,只是这里指定在哪个时间点超时,具体代码如下:
async def test_timeout_3():loop = asyncio.get_running_loop()async def inner():await asyncio.sleep(2)return 1async with asyncio.timeout_at(loop.time() + 3):res = await inner()print(res)if __name__ == '__main__':asyncio.run(test_timeout_3())
asyncio.wait_for 和 asyncio.wait
asyncio.wait_for(aw, timeout)
设置协程运行的超时时间,如果超时时间为None,则会阻塞当前协程一直等待结果回来,超过超时时间则抛出TimeoutError
异常,这是非常重要的限制协程运行时间的手段:
async def test_wait_for():async def inner():await asyncio.sleep(10)return 1try:# 超时时间是秒为单位,浮点数res = await asyncio.wait_for(inner(), 1)print(res)except asyncio.TimeoutError as e:print(f'exception happend')if __name__ == '__main__':asyncio.run(test_wait_for())
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
传递一组Future对象(一般是Task)
,设置超时时间和return条件,返回已经完成的(done)任务和未完成的(pending)任务的集合.这里timeout不会抛出异常
,这是这个点没运行完毕的任务都会放入未完成的任务集合里面.
async def test_wait():async def inner_1():await asyncio.sleep(4)return 1async def inner_2():await asyncio.sleep(6)return 2t1 = asyncio.create_task(inner_1())t2 = asyncio.create_task(inner_2())# 当有一个task完成就返回结果,最多等待5秒done, pending = await asyncio.wait([t1, t2], timeout=5, return_when=asyncio.FIRST_COMPLETED)for item in done:print(item.done())for item in pending:print(item.done())if __name__ == '__main__':asyncio.run(test_wait())
asyncio.as_completed
asyncio.as_completed(aws, *, timeout=None)¶
从可等待对象列表中按照执行快慢获取可等代对象的执行结果
,设置timeout时,当超过timeout时间时,没有全部执行完毕则抛出TimeoutError,举个例子.
async def test_completed():async def inner(n: int):await asyncio.sleep(n)return nfor coro in asyncio.as_completed([inner(3), inner(2), inner(1)]):res = await coroprint(res)if __name__ == '__main__':asyncio.run(test_completed())
输出
1
2
3
可见哪个协程先执行完毕则先返回结果
asyncio.to_thread 以及 asyncio.run_coroutine_threadsafe
协程和线程结合.
asyncio.to_thread(func, /, *args, **kwargs)
开启子线程去运行会阻塞event-loop线程的函数,且可以通过await返回这个函数在子线程中返回的结果,举个例子:
async def to_thread():def inner():print(f'seperate thread id: {threading.get_ident()}')time.sleep(2)return 1print(f'main thread id: {threading.get_ident()}')res = await asyncio.gather(asyncio.to_thread(inner))print(res)if __name__ == '__main__':asyncio.run(to_thread())
输出结果:
main thread id: 128122817214272
seperate thread id: 128122666092096
[1]
这个功能是在遇到某些function一定会阻塞线程的时候把它放置于其他线程不会阻塞当前event-loop线程.
asyncio.run_coroutine_threadsafe(coro, loop)
同样是多线程环境下的协程调度,将指定协程放在指定event-loop所在的线程去运行.
举个例子:
async def thread_safe():loop = asyncio.get_running_loop()print(f'main thread id: {threading.get_ident()}')async def inner():print(f'inner current thread: {threading.get_ident()}')return 1def handle_loop(loop):print(f'handle_loop current thread: {threading.get_ident()}')future = asyncio.run_coroutine_threadsafe(inner(), loop)print(future.result())thread = threading.Thread(target=handle_loop, args=(loop,), daemon=True)thread.start()await asyncio.sleep(5)if __name__ == '__main__':asyncio.run(thread_safe())
运行结果:
main thread id: 126657164425024
handle_loop current thread: 126657008502336
inner current thread: 126657164425024
1
可见子线程中将协程对象丢给主线程的event-loop,且在主线程内运行,可以看到main thread和 inner 协程运行的线程是同一个.
asyncio.current_task 和 asyncio.all_tasks
asyncio.current_task(loop=None)
获取指定event-loop下正在运行的task,如果传递loop为None,则是当前线程运行的loop(asyncio.get_running_loop)
asyncio.all_tasks(loop=None)
获取指定event-loop下所有的task,如果传递loop为None,则是当前线程运行的loop(asyncio.get_running_loop)
代码如下:
async def current_all():curr_task = asyncio.current_task()# 当前主协程运行print(curr_task.get_name())async def inner():# inner 协程 对应的taskcurr_task = asyncio.current_task()print(curr_task.get_name())await asyncio.sleep(10)return 1asyncio.create_task(inner(), name='inner')await asyncio.sleep(2)tasks = asyncio.all_tasks()# 总共两个taskprint(len(tasks))if __name__ == '__main__':asyncio.run(current_all())
输出结果:
Task-1
inner
2
asyncio network io(streams api)
streams相关api, high-level的基于async/await原语的异步TCP网络连接,使用同步的语法实现异步编程的网络数据传递.
首先这一部分需要自己使用streams api 构建一个 TCP server.
#!/usr/bin/env python
#-*- coding:utf-8 -*-
'''
基于asyncio的tcp server
'''import asyncioasync def handle_echo(reader, writer):data = await reader.read(100)message = data.decode()addr = writer.get_extra_info('peername')print(f"Received {message!r} from {addr!r}")print(f"Send: {message!r}")writer.write(data)await writer.drain()print("Close the connection")writer.close()await writer.wait_closed()async def main():server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)print(f'Serving on {addrs}')async with server:await server.serve_forever()if __name__ == '__main__':asyncio.run(main)
运行此socket server.
asyncio.open_connection 建立的是一个异步的TCP连接,利用其StreamReader和StreamWriter对象完成一次完整的数据发送和数据接收过程,客户端代码如下:
async def tcp_connection(message):# asyncio.StreamReader 和 asyncio.StreamWriterreader, writer = await asyncio.open_connection('127.0.0.1', 8888)print(f'Send: {message!r}')writer.write(message.encode())await writer.drain()data = await reader.read(100)print(f'Received: {data.decode()!r}')print('Close the connection')writer.close()await writer.wait_closed()if __name__ == '__main__':asyncio.run(tcp_connection('hello world'))
asyncio synchronization
异步编程中的同步原语.官方链接:https://docs.python.org/3.11/library/asyncio-sync.html#asyncio-sync
此同步机制类似于多线程编程中的同步机制,但是它也有两个不同点,第一,asyncio中的同步原语是协程
同步,保证共享对象的安全性,但是它不是线程安全的.其次同步机制本身不支持超时参数(比如限制获取锁的等待时间),需要借助于asyncio.wait_for
来实现超时限制.
asyncio.Lock
异步编程中的锁.官方文档提到2种使用异步锁的语法:
Lock本身也是异步上下文管理器,因此可以使用async with
语法实现上锁.
lock = asyncio.Lock()# ... later
async with lock:# access shared state
相应代码:
async def lock_1():lock = asyncio.Lock()try:async with lock:# 已经上锁print(f'lock block and lock state: {lock.locked()}')# 触发异常1/0except Exception as e:print(f'{e!r}')# 出lock上下文会自动释放锁,这里锁状态是已经释放了print(f'exception and lock state: {lock.locked()}')if __name__ == '__main__':asyncio.run(lock_1())
运行结果:
lock block and lock state: True
ZeroDivisionError('division by zero')
exception and lock state: False
以及第二种常见的语法
lock = asyncio.Lock()# ... later
await lock.acquire()
try:# access shared state
finally:lock.release()
相应代码:
async def lock_2():lock = asyncio.Lock()# 给获取锁操作加上超时限制try:await asyncio.wait_for(lock.acquire(), 10)except asyncio.TimeoutError as e:print(f'{e!r}')returntry:print(f'logic block lock state: {lock.locked()}')# 触发异常1/0except Exception as e:print(f'{e!r}')print(f'exception and lock state: {lock.locked()}')finally:# 需要手动释放锁lock.release()print(f'finally lock state: {lock.locked()}')if __name__ == '__main__':asyncio.run(lock_2())
运行结果:
logic block lock state: True
ZeroDivisionError('division by zero')
exception and lock state: True
finally lock state: False
两种语法比较:第一种是最简单的异步锁实现,退出lock上下文自动释放锁
,开发者只需要关注lock块内触发异常后的异常捕获,但是第一种方法没法设置获取锁的超时时间
,容易导致死锁.第二种是更灵活的实现,需要手动去释放锁
,但是可以结合asyncio.wait_for
给获取锁过程加上超时限制.
真实案例,比如多个协程操作同一个对象属性,不加锁代码如下:
async def lock_3():data = {'count': 0}async def inner():count = data['count']count += 1await asyncio.sleep(0)data['count'] = countcoros = [inner() for _ in range(10)]await asyncio.gather(*coros, return_exceptions=True)print(data['count'])if __name__ == '__main__':asyncio.run(lock_3())
输出结果:
1
加上异步锁:
async def lock_3():data = {'count': 0}lock = asyncio.Lock()async def inner():async with lock:count = data['count']count += 1await asyncio.sleep(0)data['count'] = countcoros = [inner() for _ in range(10)]await asyncio.gather(*coros, return_exceptions=True)print(data['count'])if __name__ == '__main__':asyncio.run(lock_3())
运行结果:
10
异步缩应该是最常见的异步编程中的同步机制.
asyncio.Event
event用于在多个异步任务Task中通知事件已经发生,其内部维护维护一个flag,标识事件是否发生,set()
方法是将flag设置为True表示事件发生,clear()
方法将flag设置为False,而event.wait()
则是阻塞直到flag为True
,举个例子如下:
async def event():event = asyncio.Event()async def inner():print('inner runs')# 等待setawait event.wait()print('inner finished')t = asyncio.create_task(inner())await asyncio.sleep(0)print('set event flag to True')event.set()await tif __name__ == '__main__':asyncio.run(event())
输出结果:
inner runs
set event flag to True
inner finished
如果我们去掉event.set()
这一行关键代码模拟event内置flag一直不能变成True:
async def event():event = asyncio.Event()async def inner():print('inner runs')# 等待setawait event.wait()print('inner finished')t = asyncio.create_task(inner())await asyncio.sleep(0)print('set event flag to True')#event.set()await tif __name__ == '__main__':asyncio.run(event())
程序会一直无法结束,协程一直阻塞在await event.wait()
处,这个时候就需要加入超时限制防止协程无法结束,最终代码如下:
async def event():event = asyncio.Event()async def inner():print('inner runs')# 等待setawait asyncio.wait_for(event.wait(), 5)print('inner finished')t = asyncio.create_task(inner())await asyncio.sleep(0)print('set event flag to True')#event.set()try:await texcept Exception as e:print(f'{e!r}')if __name__ == '__main__':asyncio.run(event())
输出:
inner runs
set event flag to True
TimeoutError()
asyncio.Condition
Condition是实现协程之间条件同步的机制.它结合了Event和Lock的同步机制.不仅用于控制协程中共享资源的访问,也可以等待某个状态发生后继续运行.由于它是Event和Lock结合,其中Lock相关的三个方法acquire
,locked
,release
用法相似,但是另外三个方法就非常重要notify(n=1)
,notify_all()
,wait()
.
这三个方法需要在已经获取锁
的情况下调用,不然抛出RuntimeError
.其中
notify(n=1)
通知n(n默认为1)个等待的任务可以继续运行.
notify_all()
通知所有等待的任务可以继续运行.
wait``释放当前锁
等待被通知(notify(n=1) or notify_all())当被唤醒的时候,重新获取锁
继续运行代码.
举一个publisher/subscriber的场景来详细讲解condition怎么使用.publisher会以2倍速度产生消息放置于一个固定size的buffer中,其中subscriber负责消费buffer中的消息,当buffer中消息满了的时候,publisher处于等待状态,直到subscriber消费了消息,通知publisher继续发送消息.代码如下
async def condition_pub_sub():# 创建可以将已经存在的lock传入,如果lock参数为空,内部会自己创建lockbuffer = []MAX_SIZE = 20condi = asyncio.Condition()async def publisher():counter = 0while True:async with condi:while len(buffer) >= MAX_SIZE:print('buffer is full')await condi.wait()msg = f'msg: {counter}'buffer.append(msg)counter += 1condi.notify_all()await asyncio.sleep(0.5)async def subscriber():while True:async with condi:while not buffer:print('buffer is empty')await condi.wait()# 首部弹出msg = buffer.pop(0)print(f'{msg} consumed')condi.notify_all()await asyncio.sleep(1)await asyncio.gather(publisher(), subscriber(), return_exceptions=True)if __name__ == '__main__':asyncio.run(condition_pub_sub())
asyncio.Semaphore
信号量asyncio.Semaphore(value=1)
可以理解为带内置计数器的Lock,计数器初始值创建时指定,不指定则默认为1.其方法和Lock中的方法名称以及调用方式一致,但是内部行为不同.acquire
方法是将信号量内置计数器减少1,但是如果当前信号量内置计数器已经为0了,那么这个方法就会阻塞等待其他其他地方调用release
而release
方法则是将内置计数器加1,注意信号量的release方法调用次数是无限制的,它可以比acquire
方法调用次数更多,举个例子.
async def semaphore_1():sem = asyncio.Semaphore(1)# 两次额外release增加了内置的计数器初始值sem.release()sem.release()#[unlocked, value:3]print(f'{sem!r}')async def inner(n):async with sem:print(f'runs {n}')coros = [inner(i) for i in range(2)]await asyncio.gather(*coros, return_exceptions=True)print('finished')if __name__ == '__main__':asyncio.run(semaphore_1())
运行结果:
runs 0
runs 1
finished
asyncio.BoundedSemaphore
边界信号量,是对普通信号量的补充,主要就是解决release
多次调用问题,如果release
导致内置计数器超过初始值则抛出 ValueError
异常,其它方法和常规的semaphore一致,
async def bounded_semaphore():sem = asyncio.BoundedSemaphore(1)try:sem.release()except Exception as e:print(f'{e!r}')if __name__ == '__main__':asyncio.run(bounded_semaphore())
输出
ValueError('BoundedSemaphore released too many times')
asyncio.Barrier
屏障asyncio.Barrier(parties)
允许task阻塞直到partities
个task都wait
在这个屏障上,所有协程才继续运行.
async def barrier_1():bar = asyncio.Barrier(2)async def inner(n):print(f'inner: {n} begin')await bar.wait()print(f'inner: {n} finished')t1 = asyncio.create_task(inner(0))await asyncio.sleep(2)print('main runs')t2 = asyncio.create_task(inner(1))await asyncio.sleep(2)if __name__ == '__main__':asyncio.run(barrier_1())
运行结果:
inner: 0 begin
main runs
inner: 1 begin
inner: 1 finished
inner: 0 finished
asyncio 中的 queue
异步编程中的队列,和常规队列操作相似,但是只是适用于异步编程中,队列本身是线程不安全的.内置queue有三种FIFO Queue
,Priority Queue
以及LIFO Queue
.这里主要讲FIFO Queue
其它两种Queue操作比较类似.
异步Queue中的协程依然不带超时限制,所以需要使用asyncio.wait_for
来实现超时限制.
async def fifo_queue():# maxsize默认为0,queue无size限制.maxsize <=0 都是创建的没有长度限制的queuequeue = asyncio.Queue(maxsize=5)# put是一个协程,如果queue满了则会阻塞,直到队列中值被消费await queue.put(1)# put_nowait是一个普通方法, 但是queue满了会触发 QueueFull 异常queue.put_nowait(2)# get 也是协程,如果queue是空的则会阻塞,直到queue中有值print(await queue.get())print(queue.get_nowait())try:await asyncio.wait_for(queue.get(), 10)except Exception as e:print(f'{e!r}')if __name__ == '__main__':asyncio.run(fifo_queue())
输出:
1
2
TimeoutError()
异步queue中有两个方法比较特别coroutine join()
和task_done()
.
join
阻塞当前协程,知道queue中元素被接收以及处理
task_done
表示当前queue弹出元素被处理,需要配合get
方法使用,get
之后调用task_done
才表示当前元素被处理.
如下面代码所示,分配三个task消费queue中元素,主协程等待元素消费完才继续执行.
async def queue_join():async def inner(name, queue):while True:item = await queue.get()await asyncio.sleep(1)queue.task_done()print(f'task: {name} finished received: {item}')queue = asyncio.Queue(10)for i in range(10):queue.put_nowait(f'item-{i}')# 创建tasktasks = []for i in range(3):tasks.append(asyncio.create_task(inner(f'task-{i}', queue)))print('start run tasks')start = time.perf_counter()await queue.join()duration = time.perf_counter() - startprint(f'cost: {duration:.4f} s')# 取消所有taskfor task in tasks:task.cancel()await asyncio.gather(*tasks, return_exceptions=True)print('all finished')if __name__ == '__main__':asyncio.run(queue_join())
输出
start run tasks
task: task-0 finished received: item-0
task: task-1 finished received: item-1
task: task-2 finished received: item-2
task: task-0 finished received: item-3
task: task-1 finished received: item-4
task: task-2 finished received: item-5
task: task-0 finished received: item-6
task: task-1 finished received: item-7
task: task-2 finished received: item-8
task: task-0 finished received: item-9
cost: 4.0043 s
all finished
asyncio 中的 subprocess
asyncio中的子进程管理.先看基本用法:
async def subprocess_1():async def run(cmd):# 类型 asyncio.subprocess.Processproc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)stdout, stderr = await proc.communicate()print(f'[{cmd!r} exited with {proc.returncode}]')# bytes类型print(type(stdout))print(type(stderr))if stdout:print(f'[stdout]\n{stdout.decode()}')if stderr:print(f'[stderr]\n{stderr.decode()}')coros = [run('ls /zzz'), run('sleep 1; echo "hello"')]await asyncio.gather(*coros, return_exceptions=True)print('finished')if __name__ == '__main__':asyncio.run(subprocess_1())
首先是创建子进程,两个方法create_subprocess_shell
和create_subprocess_exec
这里讲解一下三个输入输出相关的参数stdin
,stdout
,stderr
和相应入参asyncio.subprocess.PIPE
,asyncio.subprocess.STDOUT
以及asyncio.subprocess.DEVNULL
之间关系.
stdin
,stdout
以及stderr
控制子进程输入输出的重定向行为.列表如下
参数 | 取值说明 |
---|---|
stdin | None :默认值,子进程不接受来自于父进程的输入,会让创建后的process.stdin为None,所以无法从主进程输入内容到子进程asyncio.subprocess.PIPE :子进程可接受父进程传递的值作为输入,创建后的process.stdin为一个StreamWriter,可以用于输入内容给子进程asyncio.subprocess.DEVNULL :同None,创建的process.stdin也是None,无法实现输入内容到子进程 |
stdout | None : 默认值,子进程输出直接到显示到控制台,会让创建后的process.stdout为None.asyncio.subprocess.PIPE :子进程的输出重定向到process.stdout这个StreamReader.可以通过这个对象在父进程读取子进程的标准输出.asyncio.subprocess.DEVNULL :同None,创建进程的process.stdout为None,同时子进程的输出也不会显示到控制台,可以看成输出重定向到了os.devnull |
stderr | None :默认值,子进程的错误直接显示到控制台,会让创建后的process.stderr为None.asyncio.subprocess.PIPE :子进程的错误内容重定向到process.stderr这个StreamReader.可以通过这个对象在父进程读取子进程的错误信息.asyncio.subprocess.STDOUT :子进程的错误内容重定向到process.stdout,process.stderr将为None,所有内容从process.stdout获取.asyncio.subprocess.DEVNULL :同None,process.stderr为None,且控制台也不会打印子进程错误内容,相当于错误重定向到了os.devnull |
创建子进程的两个方法里面更推荐使用create_subprocess_exec
方法,它适用范围更广,而create_subprocess_shell
方法只能接收一个命令行字符串,因此后面的例子都使用create_subprocess_exec
方法创建子进程.
子进程两个重要协程wait
和comunicate
,其中wait
方法是阻塞直到子进程运行完毕获取子进程的returncode
,而communicate(input=None)
是使用更多的便捷方法,input
是父进程往子进程写入内容的bytes数据,返回结果是 (stdout_data, stderr_data)
表示从子进程stdout
和stderr
读取的bytes数据.
如下面的例子,在当前解释器运行python代码,子进程接收父进程输入,同时代码运行会有异常,异常被重定向到stdout:
async def subprocess_2():proc = await asyncio.create_subprocess_exec(sys.executable, "-c", "print(input());1/0;",stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)proc.stdin.write(b'hello\n')await proc.stdin.drain()proc.stdin.close()# 因为 stderr=asyncio.subprocess.STDOUT错误被重定向到stdout, 所以proc.stderr是None# 只能通过proc.stdout获取所有输出内容print(proc.stderr is None)raw = await proc.stdout.read()print(raw.decode())if __name__ == '__main__':asyncio.run(subprocess_2())
输出
True
hello
Traceback (most recent call last):File "<string>", line 1, in <module>
ZeroDivisionError: division by zero
更标准的实现,父进程输入内容到子进程,子进程的输出以及错误都被父进程捕获,重定向到不同的StreamReader.所以创建子进程时stdout
,stderr
最好都选择asyncio.subprocess.PIPE
,stdin
取值根据是否要通过标准输入传递数据给子进程,如果不需要这个地方就是默认值None
,如果需要选择asyncio.subprocess.PIPE
,以上是最标准做法:
async def subprocess_3():proc = await asyncio.create_subprocess_exec(sys.executable, "-c", "print(input());1/0;",stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)out, err = await proc.communicate(b'hello/n')if out:print(f'out is: {out.decode()}')if err:print(f'err is: {err.decode()}')# 推荐最后再waitawait proc.wait()print(f'subprocess return code: {proc.returncode}')if __name__ == '__main__':asyncio.run(subprocess_3())
输出
out is: hello/nerr is: Traceback (most recent call last):File "<string>", line 1, in <module>
ZeroDivisionError: division by zerosubprocess return code: 1
asyncio 中的 event loop 以及 low-level api
event loop是asyncio 中的low-level api, 实际我们开发过程中,绝大多数情况下使用的都是上面提到的high level api, 这里只讲部分使用频率较高
的event-loop相关的api,即便是使用high-level api场景也不能忽视的loop相关方法.
创建以及获取event loop
- asyncio.get_running_loop: 获取当前线程绑定的
且正在运行的
事件循环 - asyncio.get_event_lopp: 获取当前线程绑定的事件循环,如果没有则
创建新的事件循环
且使用set_event_loop(loop)
绑定到该线程.此方法不要在协程的异步上下文使用
,协程里面使用get_running_loop
获取事件循环. - asyncio.new_event_loop:创建新的事件循环,
不绑定当前线程
. - asyncio.set_event_loop: 将创建的事件循环绑定到当前线程.基本上是和
new_event_loop
配合使用.
event loop 其他方法以及使用场景
run_in_executor
此方法可以把阻塞线程的操作放到线程池中执行,防止阻塞时间循环,如下面例子,协程中想要获取用户命令行输入但是不能阻塞事件循环.
如下面代码实现ainput(async input)
#!/usr/bin/env python
#-*- coding:utf-8 -*-import asyncio# async input
async def ainput(prompt: str):# 获取当前事件循环loop = asyncio.get_running_loop()# 在默认的线程池中运行同步阻塞的input方法.return await loop.run_in_executor(None, input, prompt)async def block_input():async def inner2():print('inner 2 runs')await asyncio.gather(ainput('your input: \n'), inner2(), return_exceptions=True)print('finished')if __name__ == '__main__':asyncio.run(block_input())
输出结果:
inner 2 runs
your input:
123
finished
可见inner2
协程运行了,事件循环并未被阻塞.当然也可以用high-level api实现,比如asyncio.to_thread
等价实现代码:
async def block_input():async def inner2():print('inner 2 runs')await asyncio.gather(asyncio.to_thread(input, 'your input: \n'), inner2(), return_exceptions=True)print('finished')if __name__ == '__main__':asyncio.run(block_input())
输出:
inner 2 runs
your input:
abc
finished
总结
至此,python asyncio和异步编程的知识点总结完毕.