在后续一段时间里, 我会写一系列文章来讲述如何实现一个RPC框架(我已经实现了一个示例框架, 代码在我的github上)。 这是系列第四篇文章, 主要讲述了客户端和服务器之间的网络通信问题。
模型定义
我们需要自己来定义RPC通信所传递的内容的模型, 也就是RPCRequest和RPCResponse。
requestId interfaceName methodName parameterTypes parameters requestId exception result exception
这里唯一需要说明一下的是requestId, 你可能会疑惑为什么我们需要这个东西。
原因是,发送请求的顺序和收到返回的顺序可能是不一致的, 因此我们需要有一个标识符来表明某一个返回所对应的请求是什么。 具体怎么利用这个字段, 本文后续会揭晓。
选择NIO还是IO?
NIO和IO的选择要视具体情况而定。对于我们的RPC框架来说, 一个服务可能与多个服务保持连接, 且每次通信只发送少量信息,那么在这种情况下,NIO可能更适合一些。
我选择使用Netty来简化具体的实现, 自然地,我们就引入了Channel, Handler这些相关的概念。如果对Netty没有任何了解, 建议先去简单了解下相关内容再回过头看这篇文章。
如何复用Channel
既然使用了NIO, 我们自然希望服务和服务之间是使用长连接进行通信, 而不是每个请求都重新创建一个channel。
那么我们怎么去复用channel呢? 既然我们已经通过前文的服务发现获取到了service地址,并且与其建立了channel, 那么我们自然就可以建立一个service地址与channel之间的映射关系, 每次拿到地址之后先判断有没有对应channel, 如果有的话就复用。这种映射关系我建立了ChannelManager去管理:
* Singleton channelManager channelManager channelManager channelManager channelManager channels inetSocketAddress channel channelsinetSocketAddress channel group bootstrap bootstrapgroup SO_KEEPALIVE channel bootstrapinetSocketAddress inetSocketAddress inetSocketAddress channel channel future inetSocketAddress e log inetSocketAddress channel inetSocketAddress channel channelsinetSocketAddress channel inetSocketAddress channelsinetSocketAddress
有几个地方需要解释一下:
这里用单例的目的是, 所有的proxybean都使用同一个ChannelManager。
创建Channel的过程很简单,就是最普通的Netty客户端创建channel的方法。
在channel被关闭(比如服务器端宕机了)后,需要从map中删除对应的channel
RPCChannelInitializer是整个过程的核心所在, 用于处理请求和返回的编解码、 收到返回之后的回调等。 下文详细说这个。
编解码
上文的RPCChannelInitializer代码如下:
@ ch throws pipeline ch pipeline pipeline pipeline
这里的Encoder和Decoder都很简单, 继承了Netty中的codec,做一些简单的byte数组和Object对象之间的转换工作:
genericClass serializer ctx in out in in dataLength in in dataLength in data dataLength indata outserializerdata genericClass genericClass serializer ctx in out genericClassin data serializerin outdatalength outdata
这里我选择使用Protobuf序列化协议来做这件事(具体的ProtobufSerializer的实现因为篇幅原因就不贴在这里了, 需要的话请看项目的github)。 总的来说, 这一块还是很简单很好理解的。
发送请求与处理返回内容
请求的发送很简单, 直接用channel.writeAndFlush(request) 就行了。
问题是, 发送之后, 怎么获取这个请求的返回呢?这里,我引入了RPCResponseFuture和ResponseFutureManager来解决这个问题。
RPCResponseFuture实现了Future接口,所包含的值就是RPCResponse, 每个RPCResponseFuture都与一个requestId相关联, 除此之外, 还利用了CountDownLatch来做get方法的阻塞处理:
requestId response latch requestId response latch @ throws latch e loge response
既然每个请求都会产生一个ResponseFuture, 那么自然要有一个Manager来管理这些future:
* Singleton rpcFutureManager rpcFutureManager rpcFutureManager rpcFutureManager rpcFutureManager rpcFutureMap rpcResponseFuture rpcFutureMaprpcResponseFuture rpcResponseFuture response rpcFutureMapresponseresponse
ResponseFutureManager很好看懂, 就是提供了注册future、完成future的接口。
现在我们再回过头看RPCChannelInitializer中的RPCResponseHandler就很好理解了: 拿到返回值, 把对应的ResponseFuture标记成done就可以了!
* 处理收到返回后的回调 @ ctx response throws log response response @ ctx cause throws log cause
前文的FactoryBean的逻辑填充
到这里,我们已经实现了客户端的网络通信, 现在只需要把它加到前文的FactoryBean的doInvoke方法就好了!
proxy method args throws targetServiceName type request targetServiceName method method args method serviceAddress targetServiceName channel serviceAddress channel serviceAddress response channel request response response response response targetServiceName serviceAddress serviceDiscovery serviceAddress serviceDiscoverytargetServiceName log serviceAddress targetServiceName serviceAddress array serviceAddress host array int port array host port log channel request latch rpcResponseFuture request rpcResponseFuture channelrequest future log latch latch e loge rpcResponseFuture e log e
就这样, 一个简单的RPC客户端就实现了。 完整代码请看我的github。
还没有评论,来说两句吧...