1.Netty
Netty是由JBOSS提供的一个java开源框架。在吸收了FTP,SMTP,HTTP,各种二进制,文本协议等多种协议的实现经验,并经过设计相当精心的项目后,Netty最终成功地找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty是一个基于NIO的客户,服务器端编程框架,使用Netty可以确保你快速和简单的开发出一个网络应用。
建议参考《Netty权威指南》或《Netty In Action》,后者已有中文翻译版本在此
2.依赖配置
本例使用netty 4.0.23.Final实现网络通信
io.netty
netty-all
4.0.23.Final
compile
使用maven直接在pom.xml中添加上面的依赖即可
3.ChannelHandler设计
本例情景相对简单,在客户端和服务端均只有3个ChannelHandler。两者均有一个NettyKryoEncoder和一个NettyKryoDecoder。其中
NettyKryoEncoder用于将发送的对象序列化为字节序列,是一个
ChannelInboundHandler。而
NettyKryoDecoder用于从接收到的字节序列还原出对象,是一个
ChannelOutboundHandler。
客户端还有一个RpcClientDispatchHandler,用于接收到服务端返回的对象时(实际应用场景中只可能为RpcResponse对象)进行事件的分发。与之类似,服务端设有一个RpcServerDispatchHandler,用于接收到客户端发送的对象时
(实际应用场景中只可能为RpcRequest对象)进行事件分发。
(1).NettyKryoEncoder
ctx msg out
msg out
直接调用上一节实现的KryoSerializer进行序列化
(2).NettyKryoDecoder
ctx in
frame ctx in
frame
frame
ctx buffer index length
bufferindex length
使用LengthFieldBasedFrameDecoder进行解码,解码依赖于字节序列的头部的4个字节中存放的长度信息(KryoSerializer.serialize()方法会在头部写入长度信息,详见上一节)。获取到长度信息后使用父类的decode()方法截取出实际的字节序列,直接调用
KryoSerializer的deserialize()方法反序列化还原出对象。
(3).RpcClientDispatchHandler
rpcClientResponseHandler
rpcClientChannelInactiveListener
rpcClientResponseHandler
rpcClientChannelInactiveListener
rpcClientResponseHandler rpcClientResponseHandler
rpcClientChannelInactiveListener rpcClientChannelInactiveListener
ctx msg
rpcResponse msg
rpcClientResponseHandlerrpcResponse
ctx cause
ctx
rpcClientChannelInactiveListener
rpcClientChannelInactiveListener
channelRead在收到某个对象(由
NettyKryoDecoder解码还原得到的对象)后,强转为RpcResponse并调用RpcClientResponseHandler的addResponse()方法添加收到的
RpcResponse。
RpcClientResponseHandler需要在创建RpcClientDispatchHandler时从外部传入,每个RpcClient应该有且只有一个
RpcClientResponseHandler,用于收到服务端的调用结果
RpcResponse时进行相应的处理。RpcClientChannelInactiveListener用于在channel变为InActive状态时(一般为由于意外情况与服务端的连接中断了),调用其中的回调方法,用于实现短线自动重连功能。
(4).RpcServerDispatchHandler
rpcServerRequestHandler
rpcServerRequestHandler
rpcServerRequestHandler rpcServerRequestHandler
ctx msg
rpcRequest msg
rpcRequestWrapper rpcRequest ctx
rpcServerRequestHandlerrpcRequestWrapper
ctx cause
与RpcClientDispatchHandler类似,将收到的RpcRequest送进RpcServerRequestHandler进行处理。不同的是此处将
RpcRequest和Channel包装到了一起生成了RpcRequestWrapper,
RpcServerRequestHandler读取
RpcRequestWrapper完成其中的请求的调用后,通过
RpcRequestWrapper中的Channel将结果返回回去,调用结果必然是从收到请求的Channel返回的。
4.客户端的启动配置
前文中实现的RPC客户端的连接部分仅为一行打印输出的模拟,现替换为实际连接的代码逻辑(仅给出改变的部分)
bootstrap
eventLoopGroup
bootstrapeventLoopGroup
ch
ch
rpcClientResponseHandler rpcClientChannelInactiveListener
bootstrapALLOCATOR DEFAULT
bootstrapTCP_NODELAY
bootstrapSO_KEEPALIVE
channel
channel
e
e
host port
future bootstraphost port
future
host port
future
host port
exception
host port
e
e
connect方法中完成了对bootstrap的配置,tryConnect方法尝试连接到服务端并返回连接后的Channel,若连接失败返回null。
connect()方法会一直尝试连接,失败后等待10秒重新尝试连接直到成功。可以看到在RpcClientDispatchHandler中注册了rpcClientChannelInactiveListener,这个回调接口用于实现连接断开后自动重连。
rpcClientChannelInactiveListener channel
channel
channel
与第一次连接类似,
一直尝试连接,失败后等待10秒重新尝试连接直到成功。
5.服务端的启动配置
ctx msg out
msg out
0
ctx msg out
msg out
1
6.客户端对返回结果的处理
在多线程并发使用同一个RpcClient(无论是使用同步方式下的动态代理对象还是异步方式下的RpcClientAsyncProxy)时,由于网络情况和服务端对请求的处理时间等等的不确定,后调用的请求先返回结果是完全有可能的,因此必须使用一个唯一的id对该次调用请求进行标识。由于某次调用请求的结果只会通过发送该请求的Channel返回,服务端并不需要通过id区分不同的客户端的请求,所以此id无需全局唯一,只需要在客户端唯一即可,使用AtomicInteger进行自增即可实现。
(1).RpcClientResponseHandler
ctx msg out
msg out
2
RpcClientResponseHandler内部维护了一个Map用于管理某次调用的id和该次调用返回的RpcFuture(本例中同步调用是通过异步调用实现的,因此每次调用必然会有RpcFuture产生,区别只在于
RpcFuture是否直接返回给用户)。在RpcClientResponseHandler创建时开启了一个线程池并根据参数threads创建了指定数量的
RpcClientResponseHandleRunnable并开始执行,处理的具体实现在RpcClientResponseHandleRunnable中。
(2).RpcClientResponseHandleRunnable
ctx msg out
msg out
3
从响应队列responseQueue中不断取出RpcResponse,根据RpcResponse的id(同一次调用中
RpcResponse的id和RpcRequest相同)取出该次调用的RpcFuture。判断返回状态,若是成功则对
RpcFuture调用setResult()方法,否则调用setThrowable()方法。
(3).RpcClient中调用id的生成
id使用AtomicInteger生成,每次调用自增。RpcClient中的call()方法修改为(之前未实现调用请求的发送,仅为打印输出的模拟)
ctx msg out
msg out
4
ctx msg out
msg out
5
RpcRequest中。产生一个RpcFuture连同id注册到rpcClientResponseHandler中,通过channel将
RpcRequest发送至服务端,最后将RpcFuture返回。至于是在RpcFuture上面阻塞直到结果返回(同步调用方式)还是直接将RpcFuture返回给用户则因调用方式而异,而对于返回结果的注册和处理都是一样的。
7.服务端对调用请求的处理
(1).RpcServerRequestHandler
ctx msg out
msg out
6
已在第二节给出过,仅作了少许改动(addRequest接受RpcRequestWrapper而不是RpcRequest)
(2).RpcServerRequestHandleRunnable
实际调用被请求方法,相比第二节的代码做了一些改动,不再使用JDK自带的反射而使用Reflectasm
ctx msg out
msg out
7
在pom.xml的<
dependencies
中添加相关依赖,此处使用的是最新版的1.11.0
ctx msg out
msg out
8
ctx msg out
msg out
9
MethodAccess(每个工作线程都需要获取一次),之后一直调用持有的
methodAccess对象即可,能够一定程度上提升性能。
在调用方法前后调用了Hook中的回调方法,对调用过程中可能抛出的异常进行了捕获并包装进RpcResponse传回客户端。
8.测试
(1).TestClientBuildAndCall
ctx in
frame ctx in
frame
frame
ctx buffer index length
bufferindex length
0
ctx in
frame ctx in
frame
frame
ctx buffer index length
bufferindex length
1
(2).TestServerBuildAndStart
ctx in
frame ctx in
frame
frame
ctx buffer index length
bufferindex length
2
启动RPC服务端
先运行TestServerBuildAndStart.main()再运行TestClientBuildAndCall.main()
输出结果: (1). TestServerBuildAndStart
ctx in
frame ctx in
frame
frame
ctx buffer index length
bufferindex length
3
(2).
TestClientBuildAndCall
ctx in
frame ctx in
frame
frame
ctx buffer index length
bufferindex length
4
ctx in
frame ctx in
frame
frame
ctx buffer index length
bufferindex length
5
还没有评论,来说两句吧...