目录
序言
protobuf处理引擎的注册
具体protobuf协议的注册
以ResourceTracker协议讲解服务端启动和注册过程
总结
序言
Hadoop RPC基于即远程过程调用,远程过程调用主要包括两个部分,网络协议和数据格式。Hadoop根据数据格式,有三种不同的RPC实现:
RPC_BUILTIN
RPC_WRITABLE
RPC_PROTOCOL_BUFFER
MAX_INDEX RPC_PROTOCOL_BUFFERvalue
value
val
value val
RPC_BUILTIN是内置的用于测试的RPC调用,RPC_WRITABLE是传统的基于Writable序列化方式的RPC协议,写过MapReduce程序的都知道Writable数据格式。最后,RPC_PROTOCOL_BUFFER是基于google protobuf协议的RPC,这是目前最流行的RPC协议。由于Yarn是在Hadoop 2.0才引入的组件,因此,Yarn内部的远程过程调用全部使用了RPC_PROTOCOL_BUFFER来作为自己的RPC。从上层设计到底层设计,我们一起来看一下基于protobuf的RPC的实现方法。
protobuf处理引擎的注册
Hadoop中,每种RPC调用的处理类叫做一个Engine。基于RPC_PROTOCOL_BUFFER的RPC的engine叫做ProtobufRpcEngine,即ProtobufRpcEngine是专门用来处理基于google的protobuf消息格式的RPC协议。
根据hadoop的协议管理类ipc.Server的设计规定,所有的Engine在开始工作前,都必须向ipc.Server注册自己。看ProtobufRpcEngine的类初始化代码:
RPC_PROTOCOL_BUFFER
注册的目的,是将自己纳入中央处理器ipc.Server的调度管理之下,这样,在收到某个消息,根据消息的RpcKind就可以知道这个消息的数据格式,因此ipc.Server会将消息直接交给对应的已经注册为该RpcKind的引擎进行处理。
先看ipc.Server.registerProtocolEngine()方法:
* Register a RPC kind and the class to deserialize the rpc request. *
* Called by static initializers of rpcKind Engines
* the rpc request. 所有的rpcRequestWrapperClass必须是一个Writable
*
* ProtobufRpcEngine .registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
new Server.ProtoBufRpcInvoker())
rpcKind
rpcRequestWrapperClass
rpcInvoker
old
old
rpcKindMaprpcKind rpcRequestWrapperClass rpcInvoker
rpcKindMaprpcKind old
rpcKind
LOG rpcKind
rpcRequestWrapperClass
rpcInvoker
可见注册的过程其实是将每一个Engine的Invoker和RPCKind对应起来。Invoker,其实就是真正处理这个请求的处理者,每一个Engine都必须包含这样一个Invoker,这样,ipc.Server
具体protobuf协议的注册
拿到请求的RPCKind,实际上是直接交给对应Engine的Invoker进行处理。那Engine拿到了请求以后,是如何知道这个请求具体该怎么去运行和处理呢?这就涉及到不同的protobuf协议定义。
比如,NodeManager和ResourceManager进行通信的基于protobuf的协议,叫做ResourceTracker协议,按照protobuf规范,具体协议接口定义在一个proto文件里面,比如:
ResourceTracker.proto
rpc RegisterNodeManagerRequestProto returns RegisterNodeManagerResponseProto
rpc NodeHeartbeatRequestProto returns NodeHeartbeatResponseProto
还有ApplicationMaster用来和ResourceManager进行通信的定义在applicatioinmaster_protocol.proto的协议文件,不再贴出。
每一个协议,都服务器端都有一个sevice与之对应,这个service在启动的时候,会向ipc.Server注册自己,这个注册是指某个具体的protobuf协议的注册,要与上面讲的engine的注册区分开。具体protobuf协议全部是hadoop根据应用场景自己定义的,比如NM和RM之间的协议,AM和NM之间的协议,AM和RM之间的等等,非常多,而engine目前只有三个而已。Engine的注册和具体某个协议的这注册,原理都相同,都是告知ipc.Server,我是负责处理哪些消息的,这些消息交给我,唯一的不同就是层次不同。
那么看一下具体的某个protobuf协议的注册是如何进行的。还是以NM和RM之间的通信协议ResourceTracker为例子:
conf
rpc conf
server
rpc resourceTrackerAddress
conf
YarnRPC.create(conf);创建了一个HadoopYarnProtoRPC对象,然后其getServer()方法将协议的名字、协议的服务端实现类this传入,最终,代码往下跟踪,还是让对应的Engine来自己负责创建Server,看ProtobufRpcEngine.Server类构造函数:
protocolClass protocolImpl conf bindAddress port numHandlers
numReaders queueSizePerHandler verbose
secretManager
portRangeConfig
bindAddress port numHandlers
numReaders queueSizePerHandler conf protocolImpl
secretManager portRangeConfig
verbose verbose
RPC_PROTOCOL_BUFFER protocolClass
protocolImpl
该方法最后,是通过registerProtocolAndImpl将创建完成的protocolImpl(实际运行时是一个com.google.protobuf.BlockingService,protoc自动编译生成,专门负责反向代理这个协议在服务器端的执行)向ipc.Server进行注册。
rpcKind protocolClass protocolImpl
protocolName RPCprotocolClass
version
version RPCprotocolClass
ex
LOG protocolClass
rpcKind protocolName version
protocolClass protocolImpl
LOG rpcKind protocolName version
protocolImpl
protocolClass
注册的核心过程,其实就是将自己保存在一个以rpcKind、协议名称(如ResourceeTracker)以及版本号为key、协议的具体代理(一个BlockingService)为value的map中。
以ResourceTracker协议讲解服务端启动和注册过程
那么,Engine注册完成,具体协议对应的服务也完成了注册和启动。这时候可以开始通信了。当一条消息过来,比如,一个Yarn节点启动,需要对应的NodeManager遵循ResourceTracker协议,通过registerNodeManager()方法向远程的ResourceTrackerService发起调用,过程是什么呢?
在我的Hadoop RPC Server架构和原理 这篇博客里面介绍了,ipc.Server收到远程客户端请求以后,实际上最后是让Handler来进行消息的派发,将消息交给对应的engine进行处理,ipc.Server.Handler.run()方法:
callconnectionuserdoAs callrpcKind callconnectionprotocolName
callrpcRequest calltimestamp
以ProtobufRpcEngine为例,ipc.Server的实现类RPC.Server,查看它对ipc.Server.call()方法的实现:
rpcKind protocol
rpcRequest receiveTime
rpcKind protocol rpcRequest
receiveTime
对于ProtobufRpcEngine,getRpcInvoker(rpcKind)获取的就是ProtobufRpcEngine.Server.ProtoBufRpcInvoker,来看这个内部类的call()方法:
* This is a server side method, which is invoked over RPC. On success * the return response has protobuf response payload. On failure, the
* exception name and the stack trace are return in the resposne.
*
* In this method there three types of exceptions possible and they are
* returned in response as follows.
* In that this method returns in response the exception thrown by the
server protocol
writableRequest receiveTime
request writableRequest
rpcRequest requestrequestHeader
methodName rpcRequest
protoName rpcRequest
clientVersion rpcRequest
serververbose
LOG protocol methodName
protocolImpl server protoName
clientVersion
service protocolImplprotocolImpl
methodDescriptor service
methodName
methodDescriptor
prototype servicemethodDescriptor
param prototype
requesttheRequestRead
result
startTime
qTime startTime receiveTime
exception
serverrpcDetailedMetricsprotocolImplprotocolClass
result servicemethodDescriptor param
e
processingTime startTime
LOG
msg methodName qTime
processingTime
exception
msg exception
LOGmsg
detailedMetricsName exception
methodName
exception
serverrpcMetricsqTime
serverrpcMetricsprocessingTime
serverrpcDetailedMetricsdetailedMetricsName
processingTime
result
方法中会首先从请求头中获取方法名称、协议名称和版本号,协议名称比如ResourceTracker。
在从请求头拿到了具体的协议名称以后,就可以提取出来这个协议在服务器端的代理实现,一个BlockingService。之所以能够准确拿出对应的代理实现,是因为前面讲到,服务器端该协议的对应的Service如ResourceTrackerService已经在启动的时候向ipc.Server 注册了代理类BlockingService负责处理这个ResourceTracker协议的请求。
前面说过,这个对象是protoc对这个ResourceTracker.proto文件编译的时候自动生成的,用来负责代理对应这个协议各个方法在服务器端的执行。比如,它像一个工厂一样,把我们的方法名称组装成com.google.protobuf.Descriptors.MethodDescriptor,把我们的参数组装成com.google.protobuf.Message,然后调用自己的callBlockingMethod,来执行这个方法。
总结
综上所述,我们可以看到,Hadoop RPC服务器端整个代码实现非常复杂,但是这种设计的复杂性带来了结构和流程上的清晰以及组件责任的明确。
核心内容包括:
两个级别的注册:
Engine注册:目的是为了告知ipc.Server自己所负责的RpcKind,这样,ipc.Server在收到消息以后会根据RpcKind将消息交付给对应的Engine进行处理;
具体协议注册:具体协议如ResourceTracker协议向ipc.Server 注册的目的,就是告知ipc.Server自己所负责处理的协议,这样,ipc.Server在收到某个消息以后,会根据消息中的协议名称将消息正确地交付给对应的处理类进行处理。
比如:NodeManager根据ResourceTracker协议向ResourceManager发送了一个registerNodeManager请求,请求参数包含了自己合格节点的ip、资源等信息,服务器端收到了请求,提取出来了rpcKind 、协议名称、方法名称和请求参数,只要Engine的注册和协议注册两步已经完成,就能够找到对应的服务器端对 协议处理类。处理类完成服务器端方法调用,比如,收到registerNodeManager()请求,ResourceManager将对应的node添加到自己所管理的节点列表中去,完成以后,就可以给发起请求的客户端返回结果了。
还没有评论,来说两句吧...