1. GRpc接口服务
GRpc正如其名,是一种RPC.它实际上和RESTful接口在功能上是相近的,本质都是一种请求响应模式的服务.只是作为一个RPC,GRpc一般描述动作而非资源,并且它可以返回的不光是一个数据,而是一组流数据.
GRpc是一种跨语言的Rpc,它建立在http2上使用protobuf作为结构化数据的序列化工具.
它有4种形式:
- 请求-响应
- 请求-流响应
- 流请求-响应
- 流请求-流响应
其基本使用方式是:
- 服务端与客户端开发者协商创建一个protobuf文件用于定义rpc的形式和方法名以及不同方法传输数据的schema
- 编译protobuf文件至服务端客户端的实现语言
- 服务端实现protobuf文件中定义的方法
- 客户端调用protobuf文件中定义的方法
在python中我们使用protobuf和grpcio来编译protobuf文件.
1.1. 请求-响应
这个例子C0我们来实现一个简单的服务--输入一个数,输出这个数的平方
1.1.1. 创建一个protobuf文件
创建protobuf文件的语法可以看protobuf的语法指南
我们将函数命名为Square,每次传传入的数据是一个double型的数,传回的也是一个double型的数.
syntax = "proto3"; package squarerpc_service; service SquareService { rpc square (Message) returns (Message){} } message Message { double message = 1; }
1.1.2. 将这个proto文件编译为python模块
要将proto文件编译为python模块我们需要工具protoc和grpcio-tools
安装好这两个后我们可以使用如下命令将目标protobuf文件编译为
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
1.1.3. 服务端实现定义的方法
python的grpc服务端是使用线程实现的,这也就意味着它无法承受高并发.但这通常不是rpc关注的问题,rpc一般都是要通过起多个实例做负载均衡的,同时这也要求了我们的rpc要做到无状态.
- server.py
#!/usr/bin/env python import time from concurrent import futures import grpc from data_pb2_grpc import SquareServiceServicer, add_SquareServiceServicer_to_server from data_pb2 import Message HOST = "0.0.0.0" PORT = 5000 ONE_DAY_IN_SECONDS = 60 * 60 * 24 class SquareServic(SquareServiceServicer): def square(self, request, context): return Message(message=request.message**2) def main(): grpcServer = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_SquareServiceServicer_to_server(SquareServic(), grpcServer) print(f'"msg":"grpc start @ grpc://{HOST}:{PORT}"') grpcServer.add_insecure_port(f"{HOST}:{PORT}") grpcServer.start() try: while True: time.sleep(ONE_DAY_IN_SECONDS) except KeyboardInterrupt: grpcServer.stop(0) except Exception as e: grpcServer.stop(0) raise if __name__ == "__main__": main()
grpc服务端写法步骤:
- 继承我们定义的
service
名字加Servicer
的抽象类,并实现其中的方法,姑且叫他RPC
类 - 使用
grpc.server(executor[,maximum_concurrent_rpcs])
创建一个服务器,executor必须使用标准库的futures.ThreadPoolExecutor
,通过它来确定最大使用多少个worker.maximum_concurrent_rpcs
则是用于设置最大同时处理多少个请求,当请求超过maximum_concurrent_rpcs
的数值,name后来的请求就会被拒绝 - 使用
add_[我们定义的service名字]Servicer_to_server(RPC类的实例,grpcServer)
grpcServer.add_insecure_port(f"{HOST}:{PORT}")
绑定ip和端口grpcServer.start()
启动服务,需要注意的是grpcServer.start()
是启动新线程实现的,需要阻塞主线程以防止退出,因此需要有一个死循环在主线程.
借助多进程提高cpu利用率
python写rpc往往是处理计算密集型任务,但GIL让rpc无法高效的利用cpu.我们来重新实现下上面的服务,让计算的部分由多进程实现
Executor = futures.ProcessPoolExecutor(max_workers=3) class SquareServic(SquareServiceServicer): def square(self, request, context): f = Executor.submit(square,request.message) futures.as_completed(f) result = f.result() return Message(message=result) ... def main(): try: while True: time.sleep(ONE_DAY_IN_SECONDS) except KeyboardInterrupt: grpcServer.stop(0) Executor.shutdown() except Exception as e: grpcServer.stop(0) Executor.shutdown() raise
1.1.4. 客户端实现方式
python作为客户端同样比较常见.毕竟更多的时候我们是要调用别人写的服务
客户端需要做的是
- 连接上服务器
- 构造一个Stub的实例
- 调用stub实例上的对应方法并获得结果
同步客户端
官方默认使用的是同步写法,比较直观
- cli_sync.py
#!/usr/bin/env python import grpc from data_pb2_grpc import SquareServiceStub from data_pb2 import Message url = "localhost:5000" channel = grpc.insecure_channel(url) client = SquareServiceStub(channel=channel) result = client.square(Message(message=12.3)) print(result)
异步客户端
grpc的客户端有两种请求有.future()
方法可以返回grpc.Future
,它的接口和asyncio.Future
十分类似,但却不满足awaitable
协议.感谢aiogrpc为我们做了一个包装器,用它我们就可以使用协程语法了
- cli_async.py
#!/usr/bin/env python import asyncio from aiogrpc import insecure_channel from data_pb2_grpc import SquareServiceStub from data_pb2 import Message url = "localhost:5000" async def query(): async with insecure_channel(url) as conn: client = SquareServiceStub(channel=conn) result = await client.square(Message(message=12.3)) print(result) def main(): loop = asyncio.get_event_loop() loop.run_until_complete(query()) if __name__ == "__main__": main()
1.1.5. 添加ssl支持
由于python常用于做原型开发,所以很多时候它需要独立完成部署而不能借助其他工具,那ssl支持就是一个必须要考虑的问题了
grpc原生支持ssl只需要:
服务端修改
PythongrpcServer.add_insecure_port
改为
Pythonwith open('crt/example.key', 'rb') as f: private_key = f.read() with open('crt/example.crt', 'rb') as f: certificate_chain = f.read() server_credentials = grpc.ssl_server_credentials( ((private_key, certificate_chain,),)) grpcServer.add_secure_port(f"{HOST}:{PORT}", server_credentials)
客户端修改
Pythonconn = grpc.insecure_channel(url)
改为
Pythonwith open('crt/example.crt', 'rb') as f: trusted_certs = f.read() credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs) channel = grpc.secure_channel(url, credentials)
1.2. 请求-流响应
这种需求比较常见,有点类似python中的range函数,它生成的是一个流而非一个数组,它会一次一条的按顺序将数据发送回请求的客户端.
这个例子C1实现了给出一个正整数,它会返回从0开始到它为止的每个整数的平方.
1.2.1. 修改protobuf文件
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
0
1.2.2. 修改服务端实现
服务一端的流是一年yield
关键字推送
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
1
1.2.3. 修改客户端实现
我们在客户端可以直接用for循环读取返回的流
同步客户端
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
2
异步客户端
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
3
1.3. 流请求-响应
这种需求不是很多见,可能用的比较多的是收集一串数据后统一进行处理吧,流只是可以确保是同一个客户端发过来的而已.
这个例子C2实现了传过来一串数,之后返回他们的平方和
1.3.1. 修改protobuf文件
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
4
1.3.2. 修改服务端实现
以流为请求的服务端第二个参数为一个iterator
,因此我们可以使用for循环来获取其中的内容
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
5
1.3.3. 修改客户端实现
我们在客户端可以直接用for循环读取返回的流
同步客户端
同步客户端通过将一个iterator
作为参数来调用以流为请求的服务端
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
6
异步客户端
异步客户端则接收一个异步iterator作为参数,我们可以使用aitertools
来生成或者处理异步迭代器,如果请求流比较复杂,我们也可以创建一个异步生成器,异步语法可以看我的这篇文章
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
7
1.4. 流请求-流响应
将上面两种方式结合起来,就是我们的第四种方式,请求为一个流,响应也是流.这两个流可以是相互交叉的也可以是请求完后再返回一个流.他们在写pb文件时是相同的写法
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
8
1.4.1. 请求流完成后返回流
这个例子C3实现了传过来一串数,之后以流的形式返回这组数每个的平方.
修改服务端实现
服务端获得的请求是一个iterator
,而返回的流则是通过yield语法推出的.
python -m grpc_tools.protoc -I=$proto_dir \ --python_out=$target_dir \ --grpc_python_out=$target_dir \ $proto_file
9
修改同步客户端
同步客户端请求的参数是一个iterator
,返回的也是一个iterator
#!/usr/bin/env python import time from concurrent import futures import grpc from data_pb2_grpc import SquareServiceServicer, add_SquareServiceServicer_to_server from data_pb2 import Message HOST = "0.0.0.0" PORT = 5000 ONE_DAY_IN_SECONDS = 60 * 60 * 24 class SquareServic(SquareServiceServicer): def square(self, request, context): return Message(message=request.message**2) def main(): grpcServer = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_SquareServiceServicer_to_server(SquareServic(), grpcServer) print(f'"msg":"grpc start @ grpc://{HOST}:{PORT}"') grpcServer.add_insecure_port(f"{HOST}:{PORT}") grpcServer.start() try: while True: time.sleep(ONE_DAY_IN_SECONDS) except KeyboardInterrupt: grpcServer.stop(0) except Exception as e: grpcServer.stop(0) raise if __name__ == "__main__": main()
0
修改异步客户端
异步客户端请求的参数是一个async iterator
,返回的也是一个async iterator
#!/usr/bin/env python import time from concurrent import futures import grpc from data_pb2_grpc import SquareServiceServicer, add_SquareServiceServicer_to_server from data_pb2 import Message HOST = "0.0.0.0" PORT = 5000 ONE_DAY_IN_SECONDS = 60 * 60 * 24 class SquareServic(SquareServiceServicer): def square(self, request, context): return Message(message=request.message**2) def main(): grpcServer = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_SquareServiceServicer_to_server(SquareServic(), grpcServer) print(f'"msg":"grpc start @ grpc://{HOST}:{PORT}"') grpcServer.add_insecure_port(f"{HOST}:{PORT}") grpcServer.start() try: while True: time.sleep(ONE_DAY_IN_SECONDS) except KeyboardInterrupt: grpcServer.stop(0) except Exception as e: grpcServer.stop(0) raise if __name__ == "__main__": main()
1
1.4.2. 请求流中返回流
这个例子C4实现了传过来一串数,过程中每传来一个数就返回它的平方
修改服务端实现
这种其实只需要修改服务端即可,在每获得一个数据后就yield出去结果就行了
#!/usr/bin/env python import time from concurrent import futures import grpc from data_pb2_grpc import SquareServiceServicer, add_SquareServiceServicer_to_server from data_pb2 import Message HOST = "0.0.0.0" PORT = 5000 ONE_DAY_IN_SECONDS = 60 * 60 * 24 class SquareServic(SquareServiceServicer): def square(self, request, context): return Message(message=request.message**2) def main(): grpcServer = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_SquareServiceServicer_to_server(SquareServic(), grpcServer) print(f'"msg":"grpc start @ grpc://{HOST}:{PORT}"') grpcServer.add_insecure_port(f"{HOST}:{PORT}") grpcServer.start() try: while True: time.sleep(ONE_DAY_IN_SECONDS) except KeyboardInterrupt: grpcServer.stop(0) except Exception as e: grpcServer.stop(0) raise if __name__ == "__main__": main()
2
1.5. 总结
python的GRpc接口充分利用了python语法中的iterator协议,因此无论是服务端客户端都可以写出相当简短的服务.调用起来也最像本地的模块.
还没有评论,来说两句吧...