1. 从顺序执行到并行执行
前几天去外地参加朋友婚礼,作为一个外地人我是不认路的,但我被安排了去接其他客人,于是我不得不依赖导航.我需要打开导航,听着它的指挥开车.
这就是一个典型的并行执行的过程,我要同时开车,并且同时监听着手机上的导航的指挥.
人们往往是同时做几件,比如边看电视边吃饭,边听音乐边工作,边打电话边开车(千万不要这么做).并且很多时候我们不得不同时做几件事,而一件事是另一件事的依赖.
人可以并行的执行任务(事实上人脑就是并行处理事件的)但电脑'不行',单核电脑比较耿直只会按固定好的顺序执行任务.前文也已经介绍过了如何组织单线程过程.
但好在电脑的运转速度远比人的反应速度快,因此我们可以耍点小花招让多个任务看起来是一起执行的.
拿之前看导航开车的例子来说,实际上我开车这个事件可以看作一个循环,每个循环中我有两个动作
- 我的耳朵在监听着手机(使用声音的音色语调等特征识别),当它有指示的时候我会按照指示执行
- 没有指示就根据路况开一段
当然了这个事件看起来作为并发的例子更加合适,但道理是一样的.
注意:本文使用jupyter notebook在ipykernel 5.0+下编写.由于ipykernel有autoawait机制可以直接await协程,在.py
文件中await
关键字只能在协程函数中使用,并且启动项目需要显式的声明一个时间循环后调用loop.run_until_complete(cor)
或者隐式的使用标准库中的方法asyncio.run(cor)
启动入口协程.
一个典型的基于协程的函数可以这样写:
import asyncio async def main(): print('Hello ...') await asyncio.sleep(1) print('... World!') asyncio.run(main())
或者
import asyncio async def main(): print('Hello ...') await asyncio.sleep(1) print('... World!') loop = asyncio.get_event_loop() try: loop.run_until_compelte(main()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
1.1. 阻塞与非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.
阻塞调用是指调用结果返回之前,当前线程会被挂起.调用线程只有在得到结果之后才会返回.
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程.
如果开车的时候我监听导航是阻塞的,那就意味着我的关注点转移到了导航上,必须要有它的指导我才会有动作,这么开车早就出事故了.
推广到我们的程序,也就是说我们的流程需要可以被保存状态,将线程的控制权转移到其他流程中.同时也要可以下次再被转移回来接着上次的继续运行.
1.2. 同步与异步
同步与异步同步和异步关注的是消息通信机制(synchronous communication/ asynchronous communication).
所谓同步,就是在发出一个
*调用*
时,在没有得到结果之前,该*调用*
就不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由*调用者*
主动等待这个*调用*
的结果。而异步则是相反,
*调用*
在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在*调用*
发出后,*被调用者*
通过状态、通知来通知调用者,或通过回调函数处理这个调用。
开车的时候导航就是异步的,当打开导航后就会有个反馈--地图上我们的位置会被标记出来.而实际的导航信息都是由导航自己语音通知我们的.
有了上面的概念,我们就可以来看看python中官方的单线程并行解决方案了
1.3. 协程及其语法
协程是一种单线程通过调度器或者事件循环从而实现的并行的解决方案.它是由用户控制的"并行",因此只要代码一样(没有使用random)协程的运行顺序就是一样的.实际上完全可以等价的用回调函数实现.
协程是实现高并发的方案中开销最小的方案.在io密集型任务中往往是最高效的方案.python3.5以后协程语法已经基本定型.在随后的3.6,3.7中接口逐步完善,协程语法变得更加优雅好用.
python的协程模型可以分为如下几个部分:
coroutine function
:协程函数,创建协程的工厂.用async关键字定义的函数,调用它就会创建一个协程对象coroutine
协程对象:协程对象无法自己执行,需要将其注册到事件循环中转变为一个Task对象才会被执行.协程对象一定满足Awaitable
协议Future
期程对象:一个低级别对象用来链接底层回调式代码和高层异步/等待式代码,其概念类似JS中的Promise.通常在应用时不会用到,但在写模块时会用到,期程一定满足Awaitable
协议Task
任务对象:被注册到事件循环的协程对象就是一个任务对象.任务对象一定满足Awaitable
协议,Task
是Future
的子类,因此任务对象也是一个期程对象event_loop
调度器/事件循环(:用于调度协程运行的顺序,调度器用于调度协程而事件循环则是一种特殊的调度器--程序开启一个无限的循环,程序员会把一些函数注册到事件循环上.当满足事件发生的时候,调用相应的协程函数.我们应该尽量的使用asyncio
的高级api,隐式的执行协程.
协程语法可以说是函数的一个超集,它的特征是使用async def
来定义,并且可以在其内部使用await
关键字等待另一个协程完成.协程对象的抽象基类为collections.abc.Coroutine
,实现send(value)
,throw(type, exc, tb)
,close()
和__await__()
接口.
可以看出协程与生成器接口相似,就是多了个__await__()
少了迭代器相关的__next__()和__iter__()
事实上,在3.7版本之前,协程都是使用生成器来实现的.
协程对象内部需要实现Awaitable
协议,也就是要实现__await__
接口,这个接口必须返回一个迭代器,带有这一接口的对象我们称之为Future-like
对象,有它的就可以被程序用await
关键字挂起等待,Future-like
类的抽象基类为collections.abc.Awaitable
1.3.1. await语法
await就是用来挂起等待任务结束的关键字它只能在协程中使用.它的后面必须是一个Awaitable
的对象
有效用法:
表达式 | 被解析为 |
---|---|
if await fut: pass | if (await fut): pass |
if await fut + 1: pass | if (await fut) + 1: pass |
pair = await fut, 'spam' | pair = (await fut), 'spam' |
with await fut, open(): pass | with (await fut), open(): pass |
await foo()['spam'].baz()() | await ( foo()['spam'].baz()() ) |
return await coro() | return ( await coro() ) |
res = await coro() ** 2 | res = (await coro()) ** 2 |
func(a1=await coro(), a2=0) | func(a1=(await coro()), a2=0) |
await foo() + await bar() | (await foo()) + (await bar()) |
-await foo() | -(await foo()) |
无效用法:
表达式 | 应该写为 |
---|---|
await await coro() | await (await coro()) |
await -coro() | await (-coro()) |
一般来说await会挂起直到它下面的一串Future-like
对象都运行结束才会继续向下.
1.3.2. 异步迭代器和async for
异步迭代器可以在它的iter实现里挂起、调用异步代码,也可以在它的__next__
方法里挂起、调用异步代码.要支持异步迭代,需要:
- 对象必须实现一个
__aiter__
接口,返回一个异步迭代器对象,这个异步迭代器对象在每次迭代时会返回一个Future-like
对象 - 一个异步迭代器必须实现一个
__anext__
方法,在每次迭代时返回一个Future-like
对象 - 要停止迭代,
__anext__
必须抛出一个StopAsyncIteration
异常。
python的buildin方法中有aiter()
和anext()
可以直接调用异步迭代器的对应接口实现.
例子:
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
await main()
0 1 2 3 4
1.3.3. 异步列表解析(3.6)
列表解析中可以使用await
来等待Future-like
对象的结果,如:
result = [await fun() for fun in funcs if await condition()]
在列表中允许使用async for
来做迭代,它的形式如下:
[i async for i in Ticker(1,5) if i % 2]
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
[1, 3]
1.3.4. 异步迭代器工具
github上有一个异步迭代器工具aitertools,它的主要作用就是转换同步迭代器和对一步迭代器进行组合,主要的接口有:
aiter(iter)
将一个同步的可迭代对象转化为异步可迭代对象alist(aiter)
将一个异步可迭代对象转化为listatuple(aiter)
将一个异步可迭代对象转化为tuplecount(start=0, step=1)
生成一个从start开始每次步进step的异步计数器cycle(aiter)
将一个异步可迭代对象转化为一个以他为基础的循环(obj, times=None)
将一个对象转化为一个以他为基础的重复异步可迭代对象accumulate(iterable, func=operator.add)
对一个异步可迭代对象进行卷积chain(*iterables)
将几个可迭代对象串联compress(data, selectors)
并行处理两个可迭代的对象;如果selectors中的元素是真值,产出data中对应的元素dropwhile(predicate, iterable)
处理iterable,跳过predicate的计算结果为真值的元素,然后产出剩下的各个元素(不再进一步检查)filterfalse(predicate, iterable)
与filter函数的作用类似,不过predicate的逻辑是相反的--predicate返回假值时产出对应的元素groupby(iterable, key=None)
产出由两个元素组成的元素,形式为(key,group)其中key是分组标准,group是生成器,用于产出分组里的元素islice(iterable, *args)
产出it的切片,作用类似于s[:stop]或s[start:stop:step],不过it可以是任何可迭代的对象,而且这个函数实现的是惰性操作starmap(func, iterable)
把it中的各个元素传给func,产出结果;输入的可迭代对象应该产出可迭代的元素iit,然后以func(*iit)
这种形式调用functakewhile(predicate, iterable)
predicate返回真值时产出对应的元素,然后立即停止,不再继续检查tee(iterable, n=2)
产出一个由n个生成器组成的元组,每个生成器用于单独产出输入的可迭代对象中的元素zip_longest(*iterables, fillvalue=None)
并行从输入的各个可迭代对象中获取元素,产出由N个元素组成的元组,等到最长的可迭代对象到头后才停止,空缺的值使用fillvalue填充product(*iterables, repeat=1)
把前两个元素传给func,然后把计算结果和第三个元素传给func,以此类推,返回最后的结果;如果提供了initial,把它当作第一个元素传入
1.3.5. 异步上下文管理器和async with
异步上下文管理器类似普通的上下文管理器,可以让程序在进入上下文和离开上下文之间挂起状态,调用异步代码.
异步上下文管理器需要实现两个接口
__aenter__
处理进入上下文时的操作,如果有返回值,则可以使用as
标定上下文中的变量名__aexit__
处理离开上下文时的操作,和__exit__
的参数一样,它的参数必须是self
,exc_type
,exc
,tb
,分别代表对象自身对象,exception_type , exception_value , 和 traceback,如果正常退出,exc_type
,exc
,tb
将会是 None.
__aenter__
和__aexit__
,它们必须返回一个Future-like
对象
和普通的with语句一样,可以在单个async with语句里指定多个上下文管理器.
异步上下文管理器的一个示例:
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i class AsyncContextTicker: def __init__(self,delay, to): self.data = Ticker(delay, to) async def __aenter__(self): print('entering context') await asyncio.sleep(1) return self.data async def __aexit__(self, exc_type, exc, tb): await asyncio.sleep(1) print('exit context') async def main(): async with AsyncContextTicker(1,5) as ticker: async for i in ticker: print(i) await main()
entering context 0 1 2 3 4 exit context
1.3.6. contextlib快速创建异步上下文管理器[3.7]
就像同步接口中的上下文管理器with一样,在python3.7中contextlib.asynccontextmanager
装饰器现在可以快速创建异步上下文管理器了.
其使用方式也一致,使用yield
区分上下文的创建和回收两段.上面的例子可以修改为:
import contextlib @contextlib.asynccontextmanager async def AsyncContextTicker2(delay, to): data = Ticker(delay, to) print('entering context') await asyncio.sleep(1) try: yield data finally: await asyncio.sleep(1) print('exit context') async def main(): async with AsyncContextTicker2(1,5) as ticker: async for i in ticker: print(i) await main()
entering context 0 1 2 3 4 exit context
1.3.7. 异步生成器[3.6]
带yield
关键字的函数是生成器,带yield
关键字的协程就是异步生成器,从效果上看异步生成器效果和异步迭代器效果差不多,它需要实现协议:
- PyAsyncGenASend :
__anext__
和asend()
接口 ,对应一般生成器中的__next__
和send()
,用于在异步生成器间交互信息 - PyAsyncGenAThrow :
athrow()
andaclose()
接口,对应一般生成器的throw()
和close()
,用于关闭异步生成器或者抛出错误 StopAsyncIteration
用于标注结束
import asyncio async def main(): print('Hello ...') await asyncio.sleep(1) print('... World!') loop = asyncio.get_event_loop() try: loop.run_until_compelte(main()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
1
0 1 2 3 4
1.3.8. 关于yield from
因为异步步生成器本质上是异步迭代器的子类,我们可以利用这一点使用async for
语句代替yield from
的语义.
import asyncio async def main(): print('Hello ...') await asyncio.sleep(1) print('... World!') loop = asyncio.get_event_loop() try: loop.run_until_compelte(main()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
3
0 1 2 3 4
1.3.9. 协程的状态
协程可以有4种状态,可以是用python的反射模块inspect.getcoroutinestate(coroutine)
来查看
- CORO_CREATED: 等待被使用
- CORO_RUNNING: 目前执行中
- CORO_SUSPENDED: 目前在
await
处暂停等待信号中 - CORO_CLOSED: 执行结束
1.3.10. 实用例子
协程有三种不同的代码编写风格:
拉取式
典型的异步生成器和异步迭代器使用场景
推送式
通过将数据推送给协程让协程一步一步的计算返回数据
任务式
根据状态来排定运行顺序
推送式
我们用一个计算移动平均值的异步生成器来看看协程是如何工作的.
import asyncio async def main(): print('Hello ...') await asyncio.sleep(1) print('... World!') loop = asyncio.get_event_loop() try: loop.run_until_compelte(main()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
5
import asyncio async def main(): print('Hello ...') await asyncio.sleep(1) print('... World!') loop = asyncio.get_event_loop() try: loop.run_until_compelte(main()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
6
任务式
一个简单的离散事件仿真类--出租车队运营仿真
import asyncio async def main(): print('Hello ...') await asyncio.sleep(1) print('... World!') loop = asyncio.get_event_loop() try: loop.run_until_compelte(main()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
7
import asyncio async def main(): print('Hello ...') await asyncio.sleep(1) print('... World!') loop = asyncio.get_event_loop() try: loop.run_until_compelte(main()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
8
import asyncio async def main(): print('Hello ...') await asyncio.sleep(1) print('... World!') loop = asyncio.get_event_loop() try: loop.run_until_compelte(main()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
9
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
0
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
1
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
2
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
3
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
4
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
5
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
6
1.4. 期程
Task是Future的子类所以也可以说Task也是期程.
asyncio模块的将协程注册到时间需要先将其包装为期程,也就是Future或者Task.
Task类用来管理协同程序运行的状态,是Future的子类,Future的接口如下:
cancel()
取消期程对象并安排回调.如果期程对象已经完成或取消,返回False.否则,将期程对象的状态更改为取消,调度回调并返回True.
cancelled()
如果期程对象被取消,返回True
done()
如果期程对象完成,返回True.完成意味着结果/异常可用,或者期程对象被取消
result()
返回期程对象代表的结果.如果期程对象取消,则会引发CancelledError
.如果期程对象的结果尚不可用,则会引发InvalidStateError
.如果期程对象已经完成并且设置了异常,则会引发异常.
exception()
返回在期程对象设置的异常.异常(如果没有设置异常,则为None
)仅在期程对象完成时才会返回.如果期程对象取消,则会引发CancelledError
.如果期程对象尚未完成,则会引发InvalidStateError
.
add_done_callback(fn)
添加一个回调,以便在期程对象完成时运行.使用单个参数(未来对象)调用回调.如果在调用此函数时已经完成了未来,则使用call_soon()
调度回调.
通常需要结合functools.partial
使用
fut.add_done_callback(functools.partial(print, "Future:", flush=True))
会在回调时执行
print("Future:", fut, flush=True)
remove_done_callback(fn)
从'完成调用'列表中删除回调的所有实例.返回删除的回调数.
set_result(result)
标记期程对象的状态为done
并设定其结果.如果在调用此方法时期程对象已经完成,则引发InvalidStateError
set_exception(exception)
标记期程对象的状态为done
并设定一个异常.如果在调用此方法时期程对象已经完成,则引发InvalidStateError
Task作为Future的子类,额外的方法有:
classmethod all_tasks(loop=None)
返回一组事件循环的所有任务对象.默认情况下,返回当前事件循环的所有任务.
classmethod current_task(loop=None)
返回事件循环正在执行的任务对象,默认为当前的事件循环.在任务的上下文中调用时返回None
.
cancel()
请求此任务自行取消.这将安排一个CancelledError
通过事件循环在下一个循环中被引入到包装的协同程序中,然后,协调程序有机会使用try / except / finally
清理甚至拒绝该请求.与Future.cancel()
不同,这不保证任务将被取消.
异常可能会被捕获并被执行,延迟取消任务或者完全阻止取消.该任务也可能返回值或引发不同的异常.在调用此方法之后,cancelled()
将不会返回True
(除非该任务已被取消).当包装的协同程序以CancelledError
异常终止(即使未调用cancel()
时,任务将被标记为已取消.
get_stack(*, limit=None)
返回此任务的协程的堆栈帧列表
print_stack(*, limit=None, file=None)
打印此任务的协程的堆栈或追溯.对于由get_stack()
检索到的帧,它会产生与追溯模块类似的输出.limit
参数传递给get_stack()
.文件参数是写入输出的I/O流;默认情况下,输出将写入sys.stderr.
1.4.1. 创建期程
创建Task可以使用asyncio的高级api:
task = asyncio.ensure_future(coro)
task = asyncio.create_task(coro)
[3.7]
也可以使用事件循环的低级api:
task = loop.create_task(coro)
loop.set_task_factory(factory)
设置一个由AbstractEventLoop.create_task()
使用的工厂函数.- 如果工厂函数为None,则将设置默认任务工厂
- 如果工厂函数是可调用的,它应该有一个函数签名匹配(loop,coro),其中循环将是对活动事件循环的引用,coro将是一个协程对象.工厂函数必须返回一个
asyncio.Future
兼容的对象。
loop.get_task_factory()
获取任务工厂函数,如果默认工厂函数正在使用,则为None
创建期程则必须使用事件循环loop
,接口为:
loop.create_future()
1.4.2. 管理任务
asyncio提供了高级接口可以直接获取当前运行的事件循环中所有的任务asyncio.all_tasks()
和当前在执行的任务asyncio.current_task()
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
7
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
8
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): async for i in Ticker(1,5): print(i)
9
await main()
0
await main()
1
await main()
2
await main()
3
await main()
4
1.4.3. 多任务并行
我们可以使用asyncio.gather(*aws, loop=None, return_exceptions=False)
来并行的执行多个协程,如果aws
中的某个可等待对象为协程,它将自动作为一个任务加入日程.
如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表.结果值的顺序与aws
中可等待对象的顺序一致.
如果return_exceptions
为False
(默认),所引发的首个异常会立即传播给等待gather()
的任务.aws
序列中的其他可等待对象不会被取消,并将继续运行.
如果return_exceptions
为 True,异常会和成功的结果一样处理,并聚合至结果列表.
如果gather()
被取消,所有被提交(尚未完成)的可等待对象也会被取消.
如果aws
序列中的任一Task或Future对象被取消,它将被当作引发了CancelledError
一样处理--在此情况下gather()
调用不会被取消.这是为了防止一个已提交的Task/Future
被取消导致其他Tasks/Future
也被取消.
await main()
5
await main()
6
1.4.4. 等待任务和过期
asyncio提供了等待任务完成和超时过期的接口
asyncio.wait(List[aws], *, loop=None, timeout=None, return_when=ALL_COMPLETED)
等待一段时间让协程执行,在时间到后返回任务状态的list
await main()
7
await main()
8
asyncio.as_completed(List[aws], *, loop=None, timeout=None)
返回一个Future
对象的迭代器.返回的每个Future
对象代表来自剩余可等待对象集合的最早结果.如果在所有Future
对象完成前发生超时则将引发asyncio.TimeoutError
await main()
9
0 1 2 3 4
0
asyncio.wait_for
等待一段时间让协程执行,如果时间到了但还没有完成就会对任务执行取消操作,之后抛出asyncio.TimeoutError
0 1 2 3 4
1
0 1 2 3 4
2
1.4.5. 取消任务和防止任务被取消
一个正常的任务取消如下例:
0 1 2 3 4
3
await main()
0 1 2 3 4
5
在外部使用task.cancel()
取消任务会在协程内部触发一个asyncio.CancelledError
.我们也会有时候希望任务无法被取消,这时候可以使用asyncio.shield(aws)
,它只是将内部的任务包装了一下,取消这个包装不会取消内部的任务本身
0 1 2 3 4
6
await main()
0 1 2 3 4
8
1.4.6. 监控常驻协程
我们的协程往往是一些不会结束的常驻协程,这时候我们就需要监控他们防止异常退出.
通常我们使用task.add_done_callback(callback)
接口来监控和自动处理.注意callback
会有一个参数,这个参数就是这个task自身.
0 1 2 3 4
9
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
0
1.5. 事件循环与低级api
事件循环是一个无限的的循环,用来监控触发事件.一般我们用loop = asyncio.new_event_loop()
来创建一个事件循环的实例,然后将其使用asyncio.set_event_loop(loop)
来将循环实例定义为当前的事件循环.如果程序并不需要考虑使用多个循环的话我们也可以直接使用asyncio.get_event_loop()
或者asyncio.get_running_loop()
[3.7]来获取当前事件循环的实例
事实上python原生的事件循环并不高效,uvloop是一个高效的事件循环,它使用cython编写,并使用libuv,就是node.js用的那个高性能事件驱动的程序库.我们在生产环境可以使用它来运行协程.(windows下无法使用)
python的协程运转需要显式的指定循环.asyncio则提供了如'中央处理设备'一般的功能,它支持如下操作:
- 产生,设置和管理事件循环
- 异步时间管理
- 将回调函数注册到事件循环
- 管理协程的执行,包括取消,延迟,调用等
- 将耗时函数调用委托给一个线程池
- 协程错误处理
- 创建可用于多种类型的通信的服务端和客户端的Transports
- 启动进程以及相关的和外部通信程序的Transports
后两个操作在网络部分再讨论,本篇只讨论前面的功能
1.5.1. 产生,设置和管理事件循环
上面已经介绍了如何产生事件循环,以下是关于设置管理事件循环的接口,这些接口的实例为loop
:
loop.run_forever()
运行直到
stop()
被调用.如果在调用run_forever()
之前调用stop()
,则以超时为0轮询I/O选择器一次,运行所有响应I/O事件(以及已经安排的回调)的回调,然后退出如果在运行
run_forever()
时调用stop()
,则会运行当前批次的回调,然后退出.请注意,在这种情况下,回调计划的回调将不会运行;他们会在下一次run_forever()
被调用时运行.
loop.run_until_complete(future)
跑到期程完成.如果参数是一个
coroutine
对象,那么它被wrap_future()
包装起来成为一个期程.返回期程的结果或者抛出异常.loop.is_running()
返回时间循环的状态
loop.stop()
停止事件循环
is_closed()
如果事件循环被关闭,则返回True.
close()
关闭事件循环.循环不能再次运行,待处理的回调将丢失.这将清除队列并关闭执行程序且不等待执行程序完成.这一过程不可逆转,要再次使用必须重新创建一个时间循环并设置为当前事件循环
coroutine shutdown_asyncgens()
[3.6]安排所有当前打开的异步生成器对象,以
aclose()
调用.调用此方法后,事件循环将在每次迭代新的异步生成器时发出警告.应该用于可靠地完成所有调度的异步生成器.
1.5.2. 异步时间管理
asyncio.sleep(nbr)
这是一个异步的延迟工具,必须在协程中使用
await
调用loop.time()
根据事件循环的内部时钟,将当前时间作为浮点值返回,返回的是时间戳
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
1
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
2
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
3
1.5.3. 将回调函数注册到事件循环
它的工作机制类似于先进先出队列,所以如果一些回调需要一段时间来处理任务,其它的回调就会相应的延迟,直到先前的回调结束
回调函数处理的接口同样是loop
,他们有:
call_soon(callback, *args)
基本的回调注册,行为如前面介绍类似先进先出队列
call_later(delay, callback, *args)
在一定延迟后执行回调
call_at(when, callback, *args)
使用int或者float代表时间戳,在该时间执行回调函数
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
4
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
5
call_soon_threadsafe(callback, *args)
call_soon(callback, *args)
的线程安全版本
很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中.一个简单的方式就是使用多线程.当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环.当前线程不会被block.
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
6
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
7
启动上述代码之后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe
方法注册的more_work
方法,后者因为time.sleep
操作是同步阻塞的,因此运行完毕more_work需要大致6 + 3
1.5.4. 管理协程的执行,包括取消,延迟,调用等
事件循环实际上上面只能注册期程,而asyncio的很多接口可以直接使用协程,其原因是这些接口会自动将协程包装为Task.
loop.run_until_complete()
是最简单的将协程注册进事件循环中并运行的方法.
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
8
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def main(): result = [i async for i in Ticker(1,5) if i % 2] print(result) await main()
9
asyncio.run_coroutine_threadsafe(coro, loop)
线程安全的执行协程,可以看做是loop.run_until_complete()
的线程安全版本.
[1, 3]
0
[1, 3]
1
上述的例子,主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环.主线程通过run_coroutine_threadsafe
新注册协程对象.这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block.一共执行的时间大概在6s左右.
ensure_future
是asyncio
封装好的创建Task
的函数,它还支持一些参数,甚至指定loop.
1.5.5. 将耗时函数调用委托给一个线程池/进程池执行器
- coroutine
run_in_executor(executor, func, *args)
安排在指定的执行器中调用func.
执行器参数应该是Executor
实例.如果执行程序为无,则使用默认执行程序.
通常我们用functools.partial
来处理要执行的函数
- set_default_executor(executor)
设置run_in_executor()
使用的默认执行程序。
所谓执行器executor
是指concurrent.futures
模块下的ThreadPoolExecutor
或者ProcessPoolExecutor
的实例,在目前python标准api几乎只支持同步方法的情况下,ThreadPoolExecutor
可以作为临时方案使用解io密集型问题,而对于计算密集型任务,更加适合使用ProcessPoolExecutor
.
注意3.7的官方文档上说即将在3.8中取消set_default_executor(executor)
对ProcessPoolExecutor
的支持.但似乎run_in_executor
并不会受影响
[1, 3]
2
await main()
6
[1, 3]
4
await main()
[1, 3]
6
1.5.6. 事件循环中的错误处理
我们可以设置事件循环中的错误处理回调函数,这样一旦事件循环出错就会执行这个回调函数.我们也可以手动使用call_exception_handler()
接口执行设置的异常处理
set_exception_handler(handler)
将处理程序设置为新的事件循环异常处理程序.如果处理程序为None,则将设置默认的异常处理程序.如果处理程序是可调用对象,它应该具有匹配的签名(循环,上下文),其中循环将是对活动事件循环的引用,上下文将是一个dict
对象(有关上下文的详细信息,请参阅call_exception_handler()
文档)
get_exception_handler()
返回异常处理程序,如果使用默认处理程序,则返回
None
.
default_exception_handler(context)
默认异常处理程序.当异常发生时调用,并且没有设置异常处理程序,并且可以由想要推迟到默认行为的自定义异常处理程序调用.
context
参数与call_exception_handler()
中的含义相同.
call_exception_handler(context)
调用当前的事件循环异常处理程序.上下文是一个包含以下键的
dict
对象(新键可以稍后介绍):- ‘message’: Error message;
- ‘exception’ (optional): Exception object;
- ‘future’ (optional): asyncio.Future instance;
- ‘handle’ (optional): asyncio.Handle instance;
- ‘protocol’ (optional): Protocol instance;
- ‘transport’ (optional): Transport instance;
- ‘socket’ (optional): socket.socket instance.
[1, 3]
7
[1, 3]
8
1.6. 使用Unix信号管理事件循环
loop.add_signal_handler(signum, callback, *args)
可以为标准库signal
中定义的unix信号设置回调函数,一旦捕捉到对应的信号,就会执行注册了的回调函数,当然了也可以使用loop.remove_signal_handler(sig)
移除对信号的监控.signal
模块定义的unix信号可以在本章结语部分找到.下面的例子我们监听由signal.alarm()
发起的定时闹钟信号signal.SIGALRM
,并在收到信号后执行回调函数.
[1, 3]
9
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i class AsyncContextTicker: def __init__(self,delay, to): self.data = Ticker(delay, to) async def __aenter__(self): print('entering context') await asyncio.sleep(1) return self.data async def __aexit__(self, exc_type, exc, tb): await asyncio.sleep(1) print('exit context') async def main(): async with AsyncContextTicker(1,5) as ticker: async for i in ticker: print(i) await main()
0
1.6.1. 例子: 生产者消费者模型
以下是一个生产者消费者模式的例子
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i class AsyncContextTicker: def __init__(self,delay, to): self.data = Ticker(delay, to) async def __aenter__(self): print('entering context') await asyncio.sleep(1) return self.data async def __aexit__(self, exc_type, exc, tb): await asyncio.sleep(1) print('exit context') async def main(): async with AsyncContextTicker(1,5) as ticker: async for i in ticker: print(i) await main()
1
import asyncio class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i class AsyncContextTicker: def __init__(self,delay, to): self.data = Ticker(delay, to) async def __aenter__(self): print('entering context') await asyncio.sleep(1) return self.data async def __aexit__(self, exc_type, exc, tb): await asyncio.sleep(1) print('exit context') async def main(): async with AsyncContextTicker(1,5) as ticker: async for i in ticker: print(i) await main()
2
还没有评论,来说两句吧...