在后续一段时间里, 我会写一系列文章来讲述如何实现一个RPC框架(我已经实现了一个示例框架, 代码在我的github上)。 这是系列第四篇文章, 主要讲述了客户端和服务器之间的网络通信问题。
模型定义
我们需要自己来定义RPC通信所传递的内容的模型, 也就是RPCRequest和RPCResponse。
requestId
interfaceName
methodName
parameterTypes
parameters
requestId
exception
result
exception
这里唯一需要说明一下的是requestId, 你可能会疑惑为什么我们需要这个东西。
原因是,发送请求的顺序和收到返回的顺序可能是不一致的, 因此我们需要有一个标识符来表明某一个返回所对应的请求是什么。 具体怎么利用这个字段, 本文后续会揭晓。
选择NIO还是IO?
NIO和IO的选择要视具体情况而定。对于我们的RPC框架来说, 一个服务可能与多个服务保持连接, 且每次通信只发送少量信息,那么在这种情况下,NIO可能更适合一些。
我选择使用Netty来简化具体的实现, 自然地,我们就引入了Channel, Handler这些相关的概念。如果对Netty没有任何了解, 建议先去简单了解下相关内容再回过头看这篇文章。
如何复用Channel
既然使用了NIO, 我们自然希望服务和服务之间是使用长连接进行通信, 而不是每个请求都重新创建一个channel。
那么我们怎么去复用channel呢? 既然我们已经通过前文的服务发现获取到了service地址,并且与其建立了channel, 那么我们自然就可以建立一个service地址与channel之间的映射关系, 每次拿到地址之后先判断有没有对应channel, 如果有的话就复用。这种映射关系我建立了ChannelManager去管理:
* Singleton
channelManager
channelManager
channelManager
channelManager
channelManager
channels
inetSocketAddress
channel channelsinetSocketAddress
channel
group
bootstrap
bootstrapgroup
SO_KEEPALIVE
channel bootstrapinetSocketAddress inetSocketAddress
inetSocketAddress channel
channel
future
inetSocketAddress
e
log inetSocketAddress
channel
inetSocketAddress channel
channelsinetSocketAddress channel
inetSocketAddress
channelsinetSocketAddress
有几个地方需要解释一下:
这里用单例的目的是, 所有的proxybean都使用同一个ChannelManager。
创建Channel的过程很简单,就是最普通的Netty客户端创建channel的方法。
在channel被关闭(比如服务器端宕机了)后,需要从map中删除对应的channel
RPCChannelInitializer是整个过程的核心所在, 用于处理请求和返回的编解码、 收到返回之后的回调等。 下文详细说这个。
编解码
上文的RPCChannelInitializer代码如下:
@
ch throws
pipeline ch
pipeline
pipeline
pipeline
这里的Encoder和Decoder都很简单, 继承了Netty中的codec,做一些简单的byte数组和Object对象之间的转换工作:
genericClass
serializer
ctx in out
in
in
dataLength in
in dataLength
in
data dataLength
indata
outserializerdata genericClass
genericClass
serializer
ctx in out
genericClassin
data serializerin
outdatalength
outdata
这里我选择使用Protobuf序列化协议来做这件事(具体的ProtobufSerializer的实现因为篇幅原因就不贴在这里了, 需要的话请看项目的github)。 总的来说, 这一块还是很简单很好理解的。
发送请求与处理返回内容
请求的发送很简单, 直接用channel.writeAndFlush(request) 就行了。
问题是, 发送之后, 怎么获取这个请求的返回呢?这里,我引入了RPCResponseFuture和ResponseFutureManager来解决这个问题。
RPCResponseFuture实现了Future接口,所包含的值就是RPCResponse, 每个RPCResponseFuture都与一个requestId相关联, 除此之外, 还利用了CountDownLatch来做get方法的阻塞处理:
requestId
response
latch
requestId
response
latch
@
throws
latch
e
loge
response
既然每个请求都会产生一个ResponseFuture, 那么自然要有一个Manager来管理这些future:
* Singleton
rpcFutureManager
rpcFutureManager
rpcFutureManager
rpcFutureManager
rpcFutureManager
rpcFutureMap
rpcResponseFuture
rpcFutureMaprpcResponseFuture rpcResponseFuture
response
rpcFutureMapresponseresponse
ResponseFutureManager很好看懂, 就是提供了注册future、完成future的接口。
现在我们再回过头看RPCChannelInitializer中的RPCResponseHandler就很好理解了: 拿到返回值, 把对应的ResponseFuture标记成done就可以了!
* 处理收到返回后的回调 @
ctx response throws
log response
response
@
ctx cause throws
log cause
前文的FactoryBean的逻辑填充
到这里,我们已经实现了客户端的网络通信, 现在只需要把它加到前文的FactoryBean的doInvoke方法就好了!
proxy method args throws targetServiceName type
request
targetServiceName
method
method
args
method
serviceAddress targetServiceName
channel serviceAddress
channel
serviceAddress
response channel request
response
response
response
response
targetServiceName
serviceAddress
serviceDiscovery
serviceAddress serviceDiscoverytargetServiceName
log serviceAddress targetServiceName
serviceAddress
array serviceAddress
host array
int port array
host port
log channel request
latch
rpcResponseFuture request
rpcResponseFuture
channelrequest future
log
latch
latch
e
loge
rpcResponseFuture
e
log e
就这样, 一个简单的RPC客户端就实现了。 完整代码请看我的github。
还没有评论,来说两句吧...