前情提要:
这段代码中有两个主要流程,我们分别来说
2.1 Spark RPC 服务端 NettyRpcEnvFactory.create(config)
首先是下面这条代码的运行流程:
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
其实就是通过 NettyRpcEnvFactory 创建出一个 RPC Environment ,其具体类是 NettyRpcEnv 。
我们再来看看创建过程中会发生什么。
object NettyRpcEnvFactory extends RpcEnvFactory { ...... def create(config: RpcEnvConfig): RpcEnv = { val conf = config.conf // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance val javaSerializerInstance = new JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance] //根据配置以及地址,new 一个 NettyRpcEnv , val nettyEnv = new NettyRpcEnv(conf, javaSerializerInstance, config.bindAddress) //如果是服务端创建的,那么会启动服务。服务端和客户端都会通过这个方法创建一个 NettyRpcEnv ,但区别就在这里了。 if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => //启动服务的方法,下一步就是调用这个方法了 nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } try { Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv } ...... }还没完,如果是服务端调用这段代码,那么主要的功能是创建 RPCEnv ,即 NettyRpcEnv(客户端在后面说) 。以及通过下面这行代码,
nettyEnv.startServer(config.bindAddress, actualPort)
去调用相应的方法启动服务端的服务。下面进入到这个方法中去看看。
class NettyRpcEnv( val conf: RpcConf, javaSerializerInstance: JavaSerializerInstance, host: String) extends RpcEnv(conf) { ...... def startServer(bindAddress: String, port: Int): Unit = { // here disable security val bootstraps: java.util.List[TransportServerBootstrap] = java.util.Collections.emptyList() //TransportContext 属于 spark.network 中的部分,负责 RPC 消息在网络中的传输 server = transportContext.createServer(bindAddress, port, bootstraps) //在每个 RpcEndpoint 注册的时候都会注册一个默认的 RpcEndpointVerifier,它的作用是客户端调用的时候先用它来询问 Endpoint 是否存在。 dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) } ...... }执行完毕之后这个 create 方法就结束。这个流程主要就是开启一些服务,然后返回一个新的 NettyRpcEnv 。
2.2 Spark RPC 服务端 rpcEnv.setupEndpoint("hello-service", helloEndpoint)
这条代码会去调用 NettyRpcEnv 中相应的方法
class NettyRpcEnv( val conf: RpcConf, javaSerializerInstance: JavaSerializerInstance, host: String) extends RpcEnv(conf) { ...... override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) } ...... }我们看到,这个方法主要是调用 dispatcher 进行注册的。dispatcher 的功能上一节已经说了,
Dispatcher 的主要作用是保存注册的Rp
