Spark Master启动源码分析
发布日期:2021-05-06 23:31:04 浏览次数:25 分类:精选文章

本文共 5513 字,大约阅读时间需要 18 分钟。

文章目录

底层使用netty通信,先准备出RpcEnv环境来做通信架构,创建:收消息结构、处理消息结构。RpcEnv会处于一直启动的状态,不死不灭。
Master启动的时候,会把通信地址(也叫通信邮箱,EndPoint)注册到RpcEnv中。别人想跟Master通信,只需要发送到RpcEnv中,Master的EndPoint刚刚才向RpcEnv注册,于是RpcEnv接收到别人的通信请求,并立刻发送消息给Master。

首先,RpcEnv要先创建自己的Rpc环境和EndPoint:startRpcEnvAndEndpoint

一、RPC通信环境

1.创建Netty的RPC通信环境:

val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)

create最终调用的是new NettyRpcEnvFactory().create(config)的create方法。在NettyRpcEnvFactory类中做了一件很重要的事----->new NettyRpcEnv。

①Dispatcher转发器

NettyRpcEnv是一个class类,new的时候除了方法不执行,其余的都执行。执行的就有:

//dispatcher 这个对象中有消息队列和消息的循环获取转发private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

Dispatcher是转发器,拥有消息队列和消息的循环获取、转发。receivers就是 < EndpointData > 的消息队列:

private val receivers = new LinkedBlockingQueue[EndpointData]

Dispatcher在被new的时候,线程池threadpool也会被new出来。在线程池中,几个线程都在做一件事:

for (i <- 0 until numThreads) {     pool.execute(new MessageLoop)}

MessageLoop继承了Runnable接口,里面开启了一个while(true)死循环,无休止的receivers.take()(注:receivers是一个队列:new LinkedBlockingQueue[EndpointData],队列中放的就是EndpointData-通信邮箱的元数据)。如果一直拿不到,就放进去一个空EndpointData对象:

private val PoisonPill = new EndpointData(null, null, null)

反之如果能take出来,就去处理:

//调用process 方法处理消息data.inbox.process(Dispatcher.this)

在process中进行模式匹配,看什么类型的消息,就做什么样的处理

②new TransportContext

这里会传参:new NettyRpcHandler。这里很重要,连对象带参数,待会启动RPC都会用到

2.启动Netty RPC

通过nettyEnv.startServer来启动Netty RPC:

server = transportContext.createServer(bindAddress, port, bootstraps)

这回绑定地址、端口。实际走的是TransportContext的createServer方法:

public TransportServer createServer(      String host, int port, List
bootstraps) { //这个rpcHandler就是TransportContext传参的new NettyRpcHandler return new TransportServer(this, host, port, rpcHandler, bootstraps); }

看吧,这个rpcHandler就是当时new TransportContext时传的参数:new NettyRpcHandler。new TransportServer会走init方法,这是用来初始化网络通信管道的:

bootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) { RpcHandler rpcHandler = appRpcHandler; for (TransportServerBootstrap bootstrap : bootstraps) { rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); } //初始化网络通信管道 context.initializePipeline(ch, rpcHandler); } });

这里很重要,它会创建处理消息的channelHandler(调createChannelHandler()方法),它同时包含了request和response两种Handler,并将其包装成TransportChannelHandler对象:

return new TransportChannelHandler(client, responseHandler, requestHandler,      conf.connectionTimeoutMs(), closeIdleConnections);

TransportChannelHandler的channelRead方法负责处理消息,它会判断当前消息类型是请求的还是回应的,继而调用各自不同的handle方法:

@Override  public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {       //判断当前消息是请求的消息还是回应的消息    if (request instanceof RequestMessage) {         requestHandler.handle((RequestMessage) request);    } else if (request instanceof ResponseMessage) {         responseHandler.handle((ResponseMessage) request);    } else {         ctx.fireChannelRead(request);    }  }

以request为例,因为是RPC类型,所以走processRpcRequest方法,用的是NettyRpcHandler的receive的处理逻辑

//dispatcher负责发送远程的消息,都最终调到postMessage 方法    dispatcher.postRemoteMessage(messageToDispatch, callback)	 //无论是请求消息还是回应消息,都最终会执行到这个postMessage    postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))

传递的参数是接收者的name、message

//获取要处理消息的通信邮箱名称      val data: EndpointData = endpoints.get(endpointName)      if (stopped) {           Some(new RpcEnvStoppedException())      } else if (data == null) {           Some(new SparkException(s"Could not find $endpointName."))      } else {           //将消息放入通信端的消息队列中        data.inbox.post(message)        //添加到消息队列中        receivers.offer(data)        None      }

这个endpoints就是个Map[String,EndPointData],只要注册了,就会在这个Map里面。现在我要根据name来取出EndpointData。这个message要被放在Inbox里面(对状态进行判断、处理。暂停、不暂停)。然后就把这个EndpointData丢进receivers消息队列中去。那里有一个死循环正等着它呢。

二、注册EndPoint

rpcEnv.setupEndpoint的作用就是注册EndPoint,它会调用转发器的registerRpcEndpoint方法来注册EndPoint。这里它的第一个任务就是向Map中塞数据:

endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef))

这个Map就是刚才提到的Map[String,EndPointData]。如果没放进去,就抛异常。接着根据name获取刚刚封装的EndPointData,将其塞进receivers消息队列中去:

receivers.offer(data)

值得注意的是,当new EndpointData时,也会一起new Inbox。他就会往messages中放入一个消息对象:

inbox.synchronized {       messages.add(OnStart)  }

这样就能take出“onstart”,就对应执行Master的onStart方法,Master就启动起来了

总结

创建RPC通信环境

1.创建Netty的RPC通信环境:new NettyRpcEnv

①创建转发器Dispatcher

  • receivers消息队列< EndPointData >
  • threadpool内会跑一个MessageLoop任务,开1个死循环。拿不到就放一个空EndPointData对象,继续死循环。如果能take到,就用EndPointData的Inbox的process方法来处理消息:模式匹配,根据类型来做不同处理

②创建TransportContext

它会创建NettyPpcHandler(参数:Dispatcher),并以参数的形式,一路携带这个对象。至此,RPC环境搭建完成

2.启动RPC服务:nettyEnv.startServer

TransportContext负责创建TransportServer(绑定地址、端口),且有能力初始化网络通信管道TransportChannelHandler

  • TransportRequestHandler
  • TransportResponseHandler,NettyPpcHandler也会作为参数一路跟随至此

channelRead负责处理接收到的消息:判断是request还是response的,来各自receive处理。最终调用的是Dispatcher的postxxx方法来处理:

这有1个Map[String,EndPointData],从Map中取出EndPointData,将消息塞给它,再把EndPointData放到receivers消息队列中去

注册EndPoint

先创建出EndPoint(这里指的是Master),再通过转发器Dispatcher的注册方法来注册:

  • 将Master包装成EndPointData对象,塞进Map中
  • 在创建EndPointData对象时,也会创建Inbox对象,它会将一条OnStart塞进Messages

这样EndPointData就“充满灵魂、有血有肉”了,现在把这个完整的EndPointData对象塞进消息队列receivers中,这样线程池中跑的Runnable中的死循环就能take到OnStart,Master就启动起来了

感悟

通过Master的启动,可以看出这种通信机制设计的非常巧妙。在设计思想上,简直与Android的线程间通信的Handler机制的设计如出一辙:封装Message作为消息对象、MessageQueue作为消息队列、Looper.loop开启死循环接收消息、postxxx方法发送消息等等…

基于如此的设计,Master的启动流程,基本上和Android的根Activity的启动流程所差无几,更方便理解了。感兴趣的可以自行了解Handler机制的设计原理与root Activity的启动流程,这里就不过多赘述了

上一篇:Spark Worker启动源码
下一篇:Spark任务提交流程

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2025年03月27日 12时57分46秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章