[ ]
ZhouSa.com-宙飒天下网 1. 并发模型
1.1. 并发不是并行
- 并发是问题域中的概念——程序需要被设计成能够处理多个同时(或者几乎同时)发生的事件
- 而并行则是方法域中的概念——通过将问题中的多个部分并行执行,来加速解决问题。
引用Rob Pike的经典描述:
并发是同一时间应对(dealing with)多件事情的能力;并行是同一时间动手做(doing)多件事情的能力。
1.2. Actor模型
import asyncio from asyncio import Queue class ActorExit(Exception): pass class Actor: def __init__(self,loop=None): self.inbox = Queue(loop=loop) self.running = False def send_nowait(self, msg): ''' Send a message to the actor ''' self.inbox.put_nowait(msg) async def send(self, msg): ''' Send a message to the actor ''' await self.inbox.put(msg) async def close(self): await self.send(ActorExit) def close_nowait(self): self.send_nowait(ActorExit) async def handle_timeout(self): pass async def receive(self, message): """ Define in your subclass. """ raise NotImplemented() async def run(self): self.running = True while self.running: try: message = await self.inbox.get() except asyncio.TimeoutError: await self.handle_timeout() else: if message is ActorExit: print("actor closed") asyncio.Task.current_task().cancel() #raise ActorExit() else: await self.receive(message) class Pinger(Actor): async def receive(self, message): print(message) await pong.send('ping') await asyncio.sleep(0.5) class Ponger(Actor): async def receive(self, message): print(message) await ping.send('pong') await asyncio.sleep(0.5) async def sleep10(): await asyncio.sleep(3) await ping.close() await pong.close() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) ping = Pinger(loop=loop) pong = Ponger(loop=loop) ping.send_nowait('start') loop.run_until_complete(asyncio.wait((ping.run(), pong.run(),sleep10()))) loop.close()
start ping pong ping pong ping pong ping pong ping pong ping pong actor closed ping actor closed pong
1.3. 实现消息的发布订阅
from collections import defaultdict from contextlib import contextmanager class Exchange: def __init__(self): self._subscribers = set() def attach(self, task): self._subscribers.add(task) def detach(self, task): self._subscribers.remove(task) @contextmanager def subscribe(self, *tasks): for task in tasks: self.attach(task) try: yield self finally: for task in tasks: self.detach(task) async def send(self, msg): for subscriber in self._subscribers: await subscriber.send(msg) _exchanges = defaultdict(Exchange) def get_exchange(name): return _exchanges[name]
一个频道就是一个普通对象,负责维护一个活跃的订阅者集合,并为绑定、解绑和发送消息提供相应的方法。 每个交换机通过一个名称定位,get_exchange() 通过给定一个名称返回相应的 Exchange 实例。
class DisplayMessages: def __init__(self): self.count = 0 async def send(self, msg): self.count += 1 print('msg[{}]: {!r}'.format(self.count, msg)) # Return the Exchange instance associated with a given name def get_exchange(name): return _exchanges[name] # Example of using the subscribe() method async def main(): exc = get_exchange('name') task_a, task_b = DisplayMessages(),DisplayMessages() with exc.subscribe(task_a, task_b) as ex: await ex.send('msg1') await ex.send('msg2') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main()) loop.close()
msg[1]: 'msg1' msg[1]: 'msg1' msg[2]: 'msg2' msg[2]: 'msg2'
from itertools import count
c = count()
start ping pong ping pong ping pong ping pong ping pong ping pong actor closed ping actor closed pong