1. 客户端中间人结构实现发布订阅模式
在很多时候我们希望一条消息发送给多个终端.在'并发模型'部分我们已经讲解过发布订阅模式了,当然当时它的作用是多个协程间进行通信.实际上这种模式在网络编程中更加常用,本文就讲解如何构建一个这种模式的网络结构.
1.1. 结构描述
中间人一般来说就是一个交换机,它起到信息中转的作用.而其他客户端都是平等的.
我们来以一个简单的协议大致的看个脉络:
- 使用长连接
- 使用客户端/中间人结构
客户端允许的行为:
- 可以请求创建一个频道
- 加入请求一个频道
- 退出请求一个频道
- 向频道中广播一条消息
使用json作为通信的标准格式,以并添加
##END##
作为结尾规定允许的数据对象:
错误:
{"TYPE":"E", "QTYPE":xxx,//请求类型 "CHANNEL":xxx,//频道名 "COMMAND":xxx,//指令 "ERROR_MESSAGE":xxxx }
创建或加入一个指定频道
- 请求:
{"TYPE":"Q", "COMMAND":"JC", "CHANNEL":"xxx"//频道名 }
- 成功响应:
{"TYPE":"R", "COMMAND":"JC", "CHANNEL":"xxx"//频道名 "CNUM":"xxx"//当前成员数 }
- 请求:
离开当前频道
- 请求:
{"TYPE":"Q", "COMMAND":"LC" "CHANNEL":"xxx"//频道名 }
- 成功响应:
{"TYPE":"R", "COMMAND":"LC", "CHANNEL":"xxx"//频道名 "CNUM":"xxx"//当前成员数 }
- 请求:
广播一条信息
- 请求
{"TYPE":"Q", "COMMAND":"PUB" "CHANNEL":xxxx "MSG":xxxx }
- 广播的信息
{"TYPE":"PUB", "CHANNEL":xxxx, "MSG":xxxx }
- 请求
- 为了可以单机就验证效果,我们规定广播也会广播给同频道的自己
1.2. 中间人实现
中间人实现实际只是根据请求来找到对应的操作,操作的对象也就是channel,我们使用一个单例的全局结构subscribers = {}
来维护不同的channel以及其中存放的writer引用.
而广播不过是每个频道中的writer对象都write一下信息而已.
%%writefile broker.py import json import asyncio subscribers = {} class Exchange: def error_hander(self, query, msg): print("error") answer = { "TYPE": "E", "QTYPE": query.get("TYPE"), "CHANNEL": query.get("CHANNEL"), "COMMAND": query.get("COMMAND"), "ERROR_MESSAGE": msg } self.client_writer.write(json.dumps(answer).encode() + b'##END##') async def handler(self, query): if query["COMMAND"] == "LC": channel = subscribers.get(query["CHANNEL"]) if channel and len(channel) > 0: chanlen = len(channel) channel.discard(self.client_writer) newchanlen = len(channel) if chanlen == newchanlen: self.error_hander(query, "leave channel fail") return False if newchanlen == 0: del subscribers[query["CHANNEL"]] answer = { "TYPE": "R", "COMMAND": "LC", "CHANNEL": query["CHANNEL"], "CNUM": newchanlen } self.client_writer.write( json.dumps(answer).encode() + b'##END##') return True else: self.error_hander(query, "channel is empty") return False elif query["COMMAND"] == "JC": channel = subscribers.get(query["CHANNEL"]) if channel: channel.add(self.client_writer) else: subscribers[query["CHANNEL"]] = set([self.client_writer]) answer = { "TYPE": "R", "COMMAND": "JC", "CHANNEL": query["CHANNEL"], "CNUM": len(subscribers[query["CHANNEL"]]) } self.client_writer.write(json.dumps(answer).encode() + b'##END##') return True elif query["COMMAND"] == "PUB": channel = subscribers.get(query["CHANNEL"]) if channel: answer = {"TYPE": "PUB", "CHANNEL": query["CHANNEL"], "MSG": query["MSG"] } for subscriber in channel: subscriber.write(json.dumps(answer).encode() + b'##END##') else: self.error_hander(query, "Unknow channel") return False else: self.error_hander(query, "Unknow COMMAND") return False async def client_connected_cb(self, client_reader, client_writer): self.client_reader = client_reader self.client_writer = client_writer while True: try: data = await client_reader.readuntil(separator=b'##END##') except asyncio.streams.IncompleteReadError as re: print("client closed") self.client_writer.close() for _,channel in subscribers.items(): channel.discard(self.client_writer) print('client clear') break except: raise else: query = json.loads(data[:-7].decode()) if query["TYPE"] == "Q": await self.handler(query) else: self.error_hander(query, "Unknow Type") self.client_writer.close() def run(self, host="127.0.0.1", port=5001, loop=None): coro = asyncio.start_server( self.client_connected_cb, host=host, port=port, loop=loop) server = loop.run_until_complete(coro) # Serve requests until Ctrl+C is pressed print('Serving on {}'.format(server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: pass # Close the server server.close() loop.run_until_complete(loop.shutdown_asyncgens()) if __name__ == "__main__": app = Exchange() loop = asyncio.get_event_loop() app.run(loop=loop)
Overwriting broker.py
1.3. 客户端实现
对于客户端,相对来说就略微复杂一些了,因为除了连接外还有了一个channel对象,所以我们先要定义一个Channel类,按照传统,这个类需要有异步上下文管理器,其初始化参数为客户端对象和频道名字.然后在客户端初始化的时候使用python自带的偏函数为self.Channel
赋值为partial(Channel,client=self)
,这样在连接内就可以使用client.Channel(channel_name='xxx')
初始化一个channel对象了.
由于使用了异步上下文管理器,当退出频道代码段后也就会自动退出频道.
import json import asyncio from functools import partial class Channel: def __init__(self,client,channel_name): self.client=client self.channel_name= channel_name def publish(self,message): message_ = { "TYPE":"Q", "COMMAND":"PUB", "CHANNEL":self.channel_name, "MSG":message } data = json.dumps(message_).encode()+b'##END##' self.client.writer.write(data) print("published") return True async def listen(self): res = await self.client.reader.readuntil(separator=b'##END##') return res async def leave_channel(self): message = { "TYPE":"Q", "COMMAND":"LC", "CHANNEL":self.channel_name } data = json.dumps(message).encode()+b'##END##' self.client.writer.write(data) res = await self.client.reader.readuntil(separator=b'##END##') print(json.loads(res.decode()[:-7])) async def __aenter__(self): print('entering context') await self.client.join_channel(self.channel_name) return self async def __aexit__(self, exc_type, exc, tb): print('exit context') await self.leave_channel() class PubClient: def __init__(self,*,address='127.0.0.1',port=5001,loop=None): self.loop = loop self._address = address self._port = port self.Channel = partial(Channel,client=self) self.reader = None self.writer = None async def join_channel(self,channel_name): message = { "TYPE":"Q", "COMMAND":"JC", "CHANNEL":channel_name } data = json.dumps(message).encode()+b'##END##' self.writer.write(data) res = await self.reader.readuntil(separator=b'##END##') print(json.loads(res.decode()[:-7])) async def __aenter__(self): print('entering context') await self.connect() return self async def __aexit__(self, exc_type, exc, tb): print('exit context') self.close() async def connect(self): self.reader,self.writer = await asyncio.open_connection(host=self._address,port=self._port,loop=self.loop) def close(self): self.writer.close()
{"TYPE":"Q", "COMMAND":"JC", "CHANNEL":"xxx"//频道名 }
0
{"TYPE":"Q", "COMMAND":"JC", "CHANNEL":"xxx"//频道名 }
1
上面的例子依然没有做什么错误检测,不能用于实际项目,不过大体的流程也就是这样了.有兴趣的可以对其进行修正然后用了试试.
还没有评论,来说两句吧...