[ ]
ZhouSa.com-宙飒天下网 1. 并发模型
python提供的并发工具相对比较底层,达不到模型的层次,所谓的并发模型是对
1.1. 并发不是并行
并发和并行常常被混淆,他们虽然有交集但其实指的是完全不同的两个事儿.
并发程序含有多个逻辑上的独立执行块,它们可以独立地并行执行,也可以串行执行
并行程序解决问题的速度往往比串行程序快得多,因为其可以同时执行整个任务的多个部分。并行程序可能有多个独立执行块,也可能仅有一个
我们还可以从另一种角度来看待并发和并行之间的差异:
- 并发是问题域中的概念——程序需要被设计成能够处理多个同时(或者几乎同时)发生的事件
- 而并行则是方法域中的概念——通过将问题中的多个部分并行执行,来加速解决问题。
引用Rob Pike的经典描述:
并发是同一时间应对(dealing with)多件事情的能力;并行是同一时间动手做(doing)多件事情的能力。
本文基于协程讲解并发模型.
1.2. Actor模型
import asyncio from asyncio import Queue class ActorExit(Exception): pass class Actor: def __init__(self,loop=None): self.inbox = Queue(loop=loop) self.running = False def send_nowait(self, msg): ''' Send a message to the actor ''' self.inbox.put_nowait(msg) async def send(self, msg): ''' Send a message to the actor ''' await self.inbox.put(msg) async def close(self): await self.send(ActorExit) def close_nowait(self): self.send_nowait(ActorExit) async def handle_timeout(self): pass async def receive(self, message): """ Define in your subclass. """ raise NotImplemented() async def run(self): self.running = True while self.running: try: message = await self.inbox.get() except asyncio.TimeoutError: await self.handle_timeout() else: if message is ActorExit: print("actor closed") asyncio.Task.current_task().cancel() #raise ActorExit() else: await self.receive(message) class Pinger(Actor): async def receive(self, message): print(message) await pong.send('ping') await asyncio.sleep(0.5) class Ponger(Actor): async def receive(self, message): print(message) await ping.send('pong') await asyncio.sleep(0.5) async def sleep10(): await asyncio.sleep(3) await ping.close() await pong.close() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) ping = Pinger(loop=loop) pong = Ponger(loop=loop) ping.send_nowait('start') loop.run_until_complete(asyncio.wait((ping.run(), pong.run(),sleep10()))) loop.close()
start ping pong ping pong ping pong ping pong ping pong ping pong actor closed ping actor closed pong
1.3. 实现消息的发布订阅
要实现发布/订阅
的消息通信模式,你通常要引入一个单独的"交换机"或"网关"对象作为所有消息的中介.也就是说不直接将消息从一个任务发送到另一个,而是将其发送给一个频道,然后由频道将它发送给一个或多个被关联任务.下面是一个非常简单的频道实现例子:
from collections import defaultdict from contextlib import contextmanager class Exchange: def __init__(self): self._subscribers = set() def attach(self, task): self._subscribers.add(task) def detach(self, task): self._subscribers.remove(task) @contextmanager def subscribe(self, *tasks): for task in tasks: self.attach(task) try: yield self finally: for task in tasks: self.detach(task) async def send(self, msg): for subscriber in self._subscribers: await subscriber.send(msg) _exchanges = defaultdict(Exchange) def get_exchange(name): return _exchanges[name]
一个频道就是一个普通对象,负责维护一个活跃的订阅者集合,并为绑定、解绑和发送消息提供相应的方法。 每个交换机通过一个名称定位,get_exchange() 通过给定一个名称返回相应的 Exchange 实例。
class DisplayMessages: def __init__(self): self.count = 0 async def send(self, msg): self.count += 1 print('msg[{}]: {!r}'.format(self.count, msg)) # Return the Exchange instance associated with a given name def get_exchange(name): return _exchanges[name] # Example of using the subscribe() method async def main(): exc = get_exchange('name') task_a, task_b = DisplayMessages(),DisplayMessages() with exc.subscribe(task_a, task_b) as ex: await ex.send('msg1') await ex.send('msg2') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main()) loop.close()
msg[1]: 'msg1' msg[1]: 'msg1' msg[2]: 'msg2' msg[2]: 'msg2'
from itertools import count
c = count()
next(c)
0
next(c)
start ping pong ping pong ping pong ping pong ping pong ping pong actor closed ping actor closed pong
0
还没有评论,来说两句吧...