
本文共 5513 字,大约阅读时间需要 18 分钟。
文章目录
首先,RpcEnv要先创建自己的Rpc环境和EndPoint:startRpcEnvAndEndpoint
一、RPC通信环境
1.创建Netty的RPC通信环境:
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
create最终调用的是new NettyRpcEnvFactory().create(config)的create方法。在NettyRpcEnvFactory类中做了一件很重要的事----->new NettyRpcEnv。
①Dispatcher转发器
NettyRpcEnv是一个class类,new的时候除了方法不执行,其余的都执行。执行的就有:
//dispatcher 这个对象中有消息队列和消息的循环获取转发private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
Dispatcher是转发器,拥有消息队列和消息的循环获取、转发。receivers就是 < EndpointData > 的消息队列:
private val receivers = new LinkedBlockingQueue[EndpointData]
Dispatcher在被new的时候,线程池threadpool也会被new出来。在线程池中,几个线程都在做一件事:
for (i <- 0 until numThreads) { pool.execute(new MessageLoop)}
MessageLoop继承了Runnable接口,里面开启了一个while(true)死循环,无休止的receivers.take()(注:receivers是一个队列:new LinkedBlockingQueue[EndpointData],队列中放的就是EndpointData-通信邮箱的元数据)。如果一直拿不到,就放进去一个空EndpointData对象:
private val PoisonPill = new EndpointData(null, null, null)
反之如果能take出来,就去处理:
//调用process 方法处理消息data.inbox.process(Dispatcher.this)
在process中进行模式匹配,看什么类型的消息,就做什么样的处理
②new TransportContext
这里会传参:new NettyRpcHandler。这里很重要,连对象带参数,待会启动RPC都会用到2.启动Netty RPC
通过nettyEnv.startServer来启动Netty RPC:
server = transportContext.createServer(bindAddress, port, bootstraps)
这回绑定地址、端口。实际走的是TransportContext的createServer方法:
public TransportServer createServer( String host, int port, Listbootstraps) { //这个rpcHandler就是TransportContext传参的new NettyRpcHandler return new TransportServer(this, host, port, rpcHandler, bootstraps); }
看吧,这个rpcHandler就是当时new TransportContext时传的参数:new NettyRpcHandler。new TransportServer会走init方法,这是用来初始化网络通信管道的:
bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { RpcHandler rpcHandler = appRpcHandler; for (TransportServerBootstrap bootstrap : bootstraps) { rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); } //初始化网络通信管道 context.initializePipeline(ch, rpcHandler); } });
这里很重要,它会创建处理消息的channelHandler(调createChannelHandler()方法),它同时包含了request和response两种Handler,并将其包装成TransportChannelHandler对象:
return new TransportChannelHandler(client, responseHandler, requestHandler, conf.connectionTimeoutMs(), closeIdleConnections);
TransportChannelHandler的channelRead方法负责处理消息,它会判断当前消息类型是请求的还是回应的,继而调用各自不同的handle方法:
@Override public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception { //判断当前消息是请求的消息还是回应的消息 if (request instanceof RequestMessage) { requestHandler.handle((RequestMessage) request); } else if (request instanceof ResponseMessage) { responseHandler.handle((ResponseMessage) request); } else { ctx.fireChannelRead(request); } }
以request为例,因为是RPC类型,所以走processRpcRequest方法,用的是NettyRpcHandler的receive的处理逻辑
//dispatcher负责发送远程的消息,都最终调到postMessage 方法 dispatcher.postRemoteMessage(messageToDispatch, callback) //无论是请求消息还是回应消息,都最终会执行到这个postMessage postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
传递的参数是接收者的name、message
//获取要处理消息的通信邮箱名称 val data: EndpointData = endpoints.get(endpointName) if (stopped) { Some(new RpcEnvStoppedException()) } else if (data == null) { Some(new SparkException(s"Could not find $endpointName.")) } else { //将消息放入通信端的消息队列中 data.inbox.post(message) //添加到消息队列中 receivers.offer(data) None }
这个endpoints就是个Map[String,EndPointData],只要注册了,就会在这个Map里面。现在我要根据name来取出EndpointData。这个message要被放在Inbox里面(对状态进行判断、处理。暂停、不暂停)。然后就把这个EndpointData丢进receivers消息队列中去。那里有一个死循环正等着它呢。
二、注册EndPoint
rpcEnv.setupEndpoint的作用就是注册EndPoint,它会调用转发器的registerRpcEndpoint方法来注册EndPoint。这里它的第一个任务就是向Map中塞数据:
endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef))
这个Map就是刚才提到的Map[String,EndPointData]。如果没放进去,就抛异常。接着根据name获取刚刚封装的EndPointData,将其塞进receivers消息队列中去:
receivers.offer(data)
值得注意的是,当new EndpointData时,也会一起new Inbox。他就会往messages中放入一个消息对象:
inbox.synchronized { messages.add(OnStart) }
这样就能take出“onstart”,就对应执行Master的onStart方法,Master就启动起来了
总结
创建RPC通信环境
1.创建Netty的RPC通信环境:new NettyRpcEnv
①创建转发器Dispatcher
- receivers消息队列< EndPointData >
- threadpool内会跑一个MessageLoop任务,开1个死循环。拿不到就放一个空EndPointData对象,继续死循环。如果能take到,就用EndPointData的Inbox的process方法来处理消息:模式匹配,根据类型来做不同处理
②创建TransportContext
它会创建NettyPpcHandler(参数:Dispatcher),并以参数的形式,一路携带这个对象。至此,RPC环境搭建完成
2.启动RPC服务:nettyEnv.startServer
TransportContext负责创建TransportServer(绑定地址、端口),且有能力初始化网络通信管道TransportChannelHandler:
- TransportRequestHandler
- TransportResponseHandler,NettyPpcHandler也会作为参数一路跟随至此
channelRead负责处理接收到的消息:判断是request还是response的,来各自receive处理。最终调用的是Dispatcher的postxxx方法来处理:
这有1个Map[String,EndPointData],从Map中取出EndPointData,将消息塞给它,再把EndPointData放到receivers消息队列中去
注册EndPoint
先创建出EndPoint(这里指的是Master),再通过转发器Dispatcher的注册方法来注册:
- 将Master包装成EndPointData对象,塞进Map中
- 在创建EndPointData对象时,也会创建Inbox对象,它会将一条OnStart塞进Messages
这样EndPointData就“充满灵魂、有血有肉”了,现在把这个完整的EndPointData对象塞进消息队列receivers中,这样线程池中跑的Runnable中的死循环就能take到OnStart,Master就启动起来了
感悟
通过Master的启动,可以看出这种通信机制设计的非常巧妙。在设计思想上,简直与Android的线程间通信的Handler机制的设计如出一辙:封装Message作为消息对象、MessageQueue作为消息队列、Looper.loop开启死循环接收消息、postxxx方法发送消息等等…
基于如此的设计,Master的启动流程,基本上和Android的根Activity的启动流程所差无几,更方便理解了。感兴趣的可以自行了解Handler机制的设计原理与root Activity的启动流程,这里就不过多赘述了
发表评论
最新留言
关于作者
