基于google protobuf的RPC engine,必须在服务器端和客户端都完成了初始化之后,才能开始通信。在《Hadoop 基于protobuf 的RPC的服务器端实现原理》这篇博文中,我介绍了RPC 的服务器端实现,那么,客户端是如何基于预先定义的protobuf协议,来与远程的基于相同的protobuf协议的服务端进行通信的呢?
比如,NodeManger与远程的ResourceManager进行RPC通信,它们的通信基于ResourceTracker这个RPC协议,协议定义在ResourceTracker.proto文件中:
协议中定义了两个通信接口,registerNodeManager和nodeHeartbeat。registerNodeManager负责在节点启的NodeManager启动的时候向ResourceManger注册自己,而nodeHeartbeat是通过定时心跳的方式,不断向ResourceManager报告自己的存在,并将自己的状态汇报给ResourceManager。
在Yarn的基于Master/Slave的设计模式中,register思想是最核心的 设计思想。NodeManager被ResourceManager管理,那么NodeManager在启动的时候必须向ResourceManager注册,RPCEngine想要生效,Engine在初始化的时候也必须向ResourceManager注册。我认为这种设计思想的根本目的,是将主动权交给用户(Slave)而不是管理者(Master),这样,将Master从繁杂的管理工作中解脱出来,Master不需要关心、不需要轮训NodeManager什么时候来,什么时候启动,而是让NodeManager在启动的时候主动告知即可。
那么,这个客户端协议是怎么进行初始化并向远程的ResourceTracker发送消息的呢?既然NodeManager是该协议的客户端,我们从NodeManager代码进入,来看看初始化以及初始化以后基于协议进行通信的客户端过程。
Yarn代码设计的另外一个重要特点就是功能服务化,无论是NodeManager、ResourceManager还是MapReduce的ApplicationMaster(MRAppMaster),都抽象为服务,服务之间功能独立,服务的运行 被抽象为初始化、启动、运行和停止等基本过程,让整个代码逻辑非常清晰、封装性变得非常好。
在NodeManager的serviceInit()方法中,我们看到:
创建了一个运行时类型为NodeStatusUpdaterImpl的状态更新器,其实,这个NodeStatusUpdater也是一个service,启动ResourceTracker协议的客户端,就是在NodeStatusUpdaterImpl.serviceStart()中进行:
跟踪代码,到ServerRMProxy.createRMProxy():
在这里我们开始接触到proxy, 如果大家对IPC(Inter-Process Communication,进程间通信)或者RMI(Remote Method Invocation,远程方法调用)不是很熟悉,也许对proxy的理解产生偏差。在这里,proxy指的就是调用者,即客户端。由于在进程间通信或者远程方法调用的时候,调用者只需要调用方法,不需要关心方法是在本地还是远程执行,因此存在一个代理者(即proxy,在java RMI中,也叫做stub程序),来负责将本地客户端的调用通过TCP等网络协议在远程服务器端进行调用,然后取回调用结果提供给调用者。这就是代理的含义。
是否开启HA模式与本文讨论的话题无关,因此我们选取开启HA模式的分支。继续跟踪代码,看看基于ResourceTracker协议的RPC 客户端是怎么创建的。跟踪·RMProxy.createRMFailoverProxyProvider()方法:
然后,进入ConfiguredRMFailoverProxyProvider.init()方法:
由此可见,ConfiguredRMFailoverProxyProvider对我们的通信协议进行了HA的封装,在init方法中,设置了它所代理的协议名称(ResourceTracker)和这个协议的代理对象RMProxy;在HA环境下,客户端只需要直接使用ConfiguredRMFailoverProxyProvider给我们提供的代理对象,而不需要关心这个代理对象到底是指向了哪一个ResourceManager,这就是ConfiguredRMFailoverProxyProvider的职责,负责隐藏HA环境下的FailOver细节。
再回到上面提到的代码片段RMProxy.createRMProxy:
RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol);负责创建和初始化ResourceTracker协议在HA环境下的代理ConfiguredRMFailoverProxyProvider,那么,ConfiguredRMFailoverProxyProvider是怎么创建真正的RPC客户端的呢?
我们继续跟踪下一行代码(T) RetryProxy.create(protocol, provider, retryPolicy);,此时,protocol是ResourceTracker.class,provider是ConfiguredRMFailoverProxyProvider:
可以看到,ConfiguredRMFailoverProxyProvider通过java动态代理来代理了ResourceTracker协议里面方法的执行。熟悉java动态代理的都会明白,每一个动态代理proxy都需要有继承 java.lang.reflect.InvocationHandler并实现其invoke()方法,用来代替被代理类的执行,这里,这个InvocationHandler就是RetryInvocationHandler。ConfiguredRMFailoverProxyProvider底层真正的RPC(已经说过,ConfiguredRMFailoverProxyProvider就是对真正的RPC封装了一层HA特性),就是RetryInvocationHandler来实现的,其实是在RetryInvocationHandler的构造方法里面进行的:
接着往下跟踪:
创建远程ResourceManager服务器端的地址对象,即ResourceTracker协议的服务器端地址信息
显然,getRMAddress()方法就是通过读取配置文件来创建了一个InetSocketAddress对象,然后,真正底层创建proxy的时刻到来,来看RMProxy.getProxy():
YarnRPC是一个抽象类(Abstract Class),是Yarn对Hadoop RPC 的封装,基于历史原因和版本升级迭代,Hadoop RPC有基于多种序列化方式的RPC协议,但是由于Yarn是Hadoop 2.0之后才有的组件,是很新的component, 因此Yarn所有的RPC调用都是基于google protobuf序列化方式的RPC进行的实现。
我们一起来看YarnRPC的类图:
YarnRPC.create()
YarnRPC这个抽象类的实际实现类的名称是通过Yarn配置文件读取,默认是org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC这个类,
因此YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);实际上调用了HadoopYarnProtoRPC.getProxy()方法:
进入RpcClientFactoryPBImpl.getClient():
ResourceTrackerPBClientImpl就是对ResourceTracker协议的最下层代理,来看ResourceTrackerPBClientImpl的构造函数:
在这里,我们再次看到了hadoop中的注册思想。我们的ResourceTrackerPBClientImpl协议要想使用,必须向对应的RPC Engine注册自己。所有基于protobuf协议的RPC都必须向ProtobufRpcEngine进行注册,注册完成以后,创建底层的客户端代理。
终于,经过繁杂但是设计良好的Protobuf RPC的初始化,我们终于拿到了ResourceTracker协议的客户端实现类。此后,ResourceTracker协议的客户端,我们的 NodeManager,就可以根据协议的定义,来进行协议中的registerNodeManager()方法的调用。我们跟踪一下这个过程,试图搞清楚客户端在调用这个方法的时候,是如何不知不觉通过RPC变成了服务器端的调用的。
registerNodeManager()是由NodeManager发起的,NodeManager实际上是委托NodeStatusUpdaterImpl来与服务器端的ResourceManager进行沟通,看 NodeStatusUpdaterImpl.registerNodeManager:
这个proxy对象,是 ResourceTrackerPBClientImpl构造函数执行的时候创建的:
由于Yarn RPC使用Protobuf ,因此RPC.getProxy实际上调用的是ProtobufRpcEngine().getProxy()方法:
很明显,这里是通过java动态代理,来对ResourceTrackerPBClientImpl的方法进行代理执行。再次重复上面关于java 动态代理的解释,所有java动态代理都必须实现java.lang.reflect.InvocationHandler接口,实现其invoke()方法,用来代替被代理类的执行,对于ProtobufRpcEngine,这个InvocationHandler就是ProtobufRpcEngine.Invoker。我们看ProtobufRpcEngine.Invoker是怎么代理这个客户端的registerNodeManager()方法的执行的:
可以看到,ProtobufRpcEngine.Invokder.invoke()方法做的工作,就是提取客户端请求的方法以及方法的参数,将这些信息发送给远程服务器。远程服务器再通过解析,提取出方法和方法参数,在服务器端本地执行对应的代码,比如,服务器端从客户端请求中提取了方法名称为registerNodeManager()以及参数(包含了节点信息等等),会将节点信息进行注册和管理,然后返回注册成功信息。
在HA环境下,通过一层层代理封装,Yarn实现了HA环境下的ResourceManager协议客户端,ResourceTrackerPBClientImpl封装了该协议的客户端实现,属于下层代理,通过这个下层动态代理,将客户端对应方法的调用,转换成字节码信息发送给远端,而ConfiguredRMFailoverProxyProvider也是通过动态代理,在ResourceTrackerPBClientImpl的上层进行了封装,以实现High Availability特性。在HA环境下,NodeManager作为ResourceTracker客户端,从ConfiguredRMFailoverProxyProvider的上层代理往下调用,到达ResourceTrackerPBClientImpl下层代理,然后ResourceTrackerPBClientImpl通过动态代理,将请求信息发送到RPC Server,实现了该协议的一次调用。
还没有评论,来说两句吧...