Spark RPC 通信源码分析
提示
以下内容基于 Spark 3.0.0。
# 1. Spark RPC 通信模型
# 1.1 通信模型概览
- Spark 中将需要通信的实体抽象为 RpcEndpoint,所有的通信实体都是 RpcEndpoint 的实现类。实体之间通信都需要依赖于 RPC 环境,RPC 环境的抽象是 RpcEnv 抽象类,它只有一个实现类 NettyRpcEnv。
- 由于互相通信的两个实体可能存在于同一个进程里,也可能是在同一个主机的不同进程里,也可能是在不同主机上。为了保证信息能够准确到达目标实体,需要借助分发层负责将 RPC 消息路由到适当的端点。Spark 中分发层的实现是 Dispatcher,它的功能类似于邮局,负责消息转发。
- 消息队列的目的是为了保证消息处理的有序性以及方便并发控制。在发件箱和收件箱中分别都维护了消息队列,消息队列是通过
LinkedList
实现的,比较简单。 - 传输层是真正负责消息在网络中的序列化以及传输的角色,Spark 自 2.0 版本开始就由 Netty 负责传输层的实现。
# 1.2 关于通信实体
在 Spark RPC 环境中,将通信实体统一抽象为 RpcEndpoint,所有运行在 Spark RPC 框架上的通信实体都应该是 RpcEndpoint 的子类。比如 Master、Worker、HeartbeatReceiver 等。
RpcEndpoint 表示通信实体的抽象,只负责接收并处理消息,如果需要向 RpcEndpoint 发送消息,调用方就需要持有 RpcEndpoint 的引用对象 RpcEndpointRef。RpcEndpoint 和 RpcEndpointRef 维护了一套消息投递规则,用于消息投递。每个 RpcEndpoint 都会有一个对应的 RpcEndpointRef。假设 RpcEndpoint 是一个具体的网站,如果某个人要访问该网站,就必须通过它的网址(即该网站的引用)去访问,而不是直接持有该网站真正的实体,也不需要关心网站背后的消息传输问题。
如果你熟悉消息队列的话,那么对消息投递规则一定不陌生:
- At Most Once:消息最多投递一次,可能会被丢失。成本最低而且性能最高。
- At Least Once:消息至少投递一次,消息可能会重复,但是不会丢失。相对 At Most Once 来说成本要高,因为要记录所有发送过的消息状态并重试。
- Exactly Once:精准一次,消息不丢不重复。成本最高而且性能最差,因为不但要记录发送过的消息状态,而且要精确地避免消息重复发送。
Spark RPC 通信同样遵循上面的消息投递规则,所以在消息的发送方 RpcEndpointRef 中提供了send()
方法和一组ask*()
方法。send()
用于发送那些不期望被回复的消息,而ask*()
发送的消息需要被回复。
相应地,在 RpcEndpoint 中维护了receive()
和receiveAndReply()
方法。receive()
只负责接受消息,而receiveAndReply()
处理完消息需要回复。
消息在 Spark 中的实现都是 case class
,根据不同的需要定义对应的case class
,如:
case class RegisterWorker(id: String, host: String, port: Int, worker: RpcEndpointRef, cores: Int, memory: Int, workerWebUiUrl: String, masterAddress: RpcAddress, resources: Map[String, ResourceInformation] = Map.empty)
extends DeployMessage {
Utils.checkHost(host)
assert (port > 0)
}
case class RequestDriverStatus(driverId: String) extends DeployMessage
case object SendHeartbeat
2
3
4
5
6
7
8
9
在 RpcEndpoint 子类的receive()
和receiveAndReply()
方法中,对消息的处理是通过模式匹配实现的,匹配到不同的消息执行相应的逻辑,比如:
override def receive: PartialFunction[Any, Unit] = {
case CompleteRecovery => completeRecovery()
case RevokedLeadership =>
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
masterAddress, resources) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else if (idToWorker.contains(id)) {
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, true))
} else {
val workerResources = resources.map(r => r._1 -> WorkerResourceInfo(r._1, r._2.addresses))
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl, workerResources)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, false))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 1.3 关于消息
- 实体之间的通信内容是以 Message 的形式发送的。消息的实现都是一些样例类,前面已经说过。
- 客户端在发送消息之前会将所有的消息都包装为 RequestMessage 类型,在消息进入到传输层之前被封装为 OutboxMessage,当分发层接收到消息之后又会以 InboxMessage 的形式发送给目标端点。
- 客户端在发送 OutboxMessage 时会统一将其放入 OutBox 中,而当分发层接收到消息发送给目标端点时会统一将消息放入 Inbox 中。
# 2. 源码分析
# 2.1 RPC 通信源码流程
下图是对 RPC 通信涉及到的核心源码的流程分析,原本应该是在分析完核心源码之后作总结用的。但是源码的内容比较长且枯燥且杂乱,所以提前到这里。
# 2.1.1 消息发送流程
- 所有涉及到 RPC 通信的地方都依赖于 NettyRpcEnv 提供的基础能力。
- 要对目标 Endpoint 发送消息,首先需要持有 Endpoint 对应的引用 RpcEndpointRef,通过 RpcEndpointRef 发送消息。
- RpcEndpointRef 通过 send() 和 ask() 方法发送消息,但实际上这两个方法都是对 NettyRpcEnv 中 send() 和 ask() 方法的代理。
- NettyRpcEnv 在发送消息之前会根据通信地址进行判断。如果接收方的地址是本机地址(即发送方和接收方都在同一个进程内),则直接调用本地 Dispatcher 中的
post*()
方法,否则才会走 RPC 调用发送消息。提示
这里的通信地址在源码中的实现是
RpcAddress
,其只有两个属性host
和port
。 - 如果是通过 RPC 调用发送消息,最终都会通过 NettyRpcEnv 的
postToOutBox()
方法将消息统一存储到 outboxes 中,outboxes 实际上就是一个 Map 结构。
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
针对每个消息的接收方都有一个单独的 Outbox,和该接收方相关的消息都会统一交给对应的 Outbox 处理。Outbox 中的多个消息是由 messages
统一存储的。
private val messages = new java.util.LinkedList[OutboxMessage]
postToOutBox()
方法中在将消息存储到对应的 OutBox 之后会随之调用该 OutBox 的send()
方法。OutBox 内部会通过while(true)
的方式对 messages 进行消费。截取部分源码:
while (true) {
try {
val _client = synchronized { client }
if (_client != null) {
message.sendWith(_client)
} else {
assert(stopped)
}
} catch {
case NonFatal(e) =>
handleNetworkFailure(e)
return
}
synchronized {
if (stopped) {
return
}
message = messages.poll()
if (message == null) {
draining = false
return
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sendWith()
方法中最终调用的是 TransportClient 的sendRpc
方法向远程 NettyRpcEnv 发送数据。TransportClient 是传输层的实现,对应的还有 TransportServer。本地 TransportClient 与远程 TransportServer 建立连接之后会通过 Netty 管道进行消息传输。
至此,消息发送的过程已经结束。接下来是接收的过程。
# 2.1.2 消息接收流程
某一个通信实体既可能是消息的发送方,也可能是消息的接收方。换句话说,既可能是 Client,也可能是 Server。所以在远程端点上会有一个和本地端点相同的 NettyRpcEnv。
- 远程端点上的 TransportServer 接收到来自 TransportClient 的消息之后会交给 NettyRpcHandler 的
receive()
方法。该方法会调用 Dispatcher 的postRemoteMessage()
或postOneWayMessage()
方法将该消息交给 Dispatcher 处理。 - Dispatcher 有三个核心变量:
private val endpoints: ConcurrentMap[String, MessageLoop] = new ConcurrentHashMap[String, MessageLoop]
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
private lazy val sharedLoop = new SharedMessageLoop(nettyEnv.conf, this, numUsableCores)
2
3
endpoints 中维护了 Endpoint 和 MessageLoop 的映射关系。endpointRefs 维护了 Endpoint 和 EndpointRef 的映射关系。MessageLoop 是从其名字就可以看出来是一个专门用于处理消息的循环,SharedMessageLoop 是 MessageLoop 的一个具体实现,其内部是一个共享线程池,表示多个 Endpoint 可以共用一个线程池来转发消息。Dispatcher 的postRemoteMessage()
或postOneWayMessage()
最终都是通过postMessage()
方法将 Message 交给 MessageLoop 处理。
- MessageLoop 中为每个 Endpoint 维护了一个 Inbox,用来存储收到的 Message。其定义如下:
private val endpoints = new ConcurrentHashMap[String, Inbox]()
- 而在 SharedMessageLoop 初始化的时候就会启动一个线程池用于消费消息。
override protected val threadpool: ThreadPoolExecutor = {
val numThreads = getNumOfThreads(conf)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(receiveLoopRunnable)
}
pool
}
protected val receiveLoopRunnable = new Runnable() {
override def run(): Unit = receiveLoop()
}
private def receiveLoop(): Unit = {
try {
while (true) {
try {
val inbox = active.take()
if (inbox == MessageLoop.PoisonPill) {
// Put PoisonPill back so that other threads can see it.
setActive(MessageLoop.PoisonPill)
return
}
inbox.process(dispatcher)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case _: InterruptedException => // exit
case t: Throwable =>
try {
// Re-submit a receive task so that message delivery will still work if
// UncaughtExceptionHandler decides to not kill JVM.
threadpool.execute(receiveLoopRunnable)
} finally {
throw t
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
线程池中的循环内拿到 Inbox 之后会调用 Inbox 的process()
方法,process()
中同样也是通过while(true)
的方式获取 Message,然后通过模式匹配最终调用 Endpoint 的方法进行相应处理。
# 2.2 RpcEndpoint 源码
在 Spark RPC 环境中,将通信实体统一抽象为 RpcEndpoint,所有运行在 Spark RPC 框架上的通信实体都应该是 RpcEndpoint 的子类。RpcEndpoint 继承体系如下:
RpcEndpoint 的子类可以分为四种:
- 直接实现 RpcEndpoint;
- 通过 ThreadSafeRpcEndpoint 接口间接实现,比如 Master、Worker。从命名来看,ThreadSafeRpcEndpoint 对消息的处理都是现成安全的,而且是串行的方式进行,即前一条消息处理完之后才能处理下一条数据。
- 通过 IsolatedRpcEndpoint 接口间接实现,比如 BlockManagerMasterEndpoint、DriverEndpoint。
- 还有一些在上图中没有体现,是通过匿名内部类实现的。
RpcEndpoint 源码如下:
private[spark] trait RpcEndpoint {
// 当前 RpcEndpoint 所属的RpcEnv
val rpcEnv: RpcEnv
// 当前 RpcEndpoint 对应的引用类 RpcEndpointRef
// 调用 onStart 时,self 将生效。当调用 onStop 时,self 将变为 null
// 因为在 onStart 之前,RpcEndpoint 还没有被注册,并且没有有效的 RpcEndpointRef。所以在调用 onStart 之前不要调用 self。
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}
// 接收并处理来自 RpcEndpointRef.send 和 RpcCallContext.reply 的消息
// 处理之后不需要给客户端回复
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}
// 接收并处理来自 RpcEndpointRef.ask 的消息,需要给客户端回复
// 回复是通过 RpcCallContext 实现的
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}
// 处理消息发生异常时调用
def onError(cause: Throwable): Unit = {
// By default, throw e and let RpcEnv handle it
throw cause
}
// 当客户端与当前节点建立连接之后调用
def onConnected(remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}
// 当客户端与当前节点断开连接之后调用
def onDisconnected(remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}
// 当客户端节点与当前节点之间的网络连接发生错误时调用
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}
// 在当前节点真正开始处理消息之前进行调用
def onStart(): Unit = {
// By default, do nothing.
}
// 在当前节点停止时调用,做一些收尾工作
def onStop(): Unit = {
// By default, do nothing.
}
// 停止当前 RpcEndpoint
final def stop(): Unit = {
val _self = self
if (_self != null) {
rpcEnv.stop(_self)
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
RpcEndpoint 中没有对 receive()
和 receiveAndReply()
方法的具体实现,但是在其子类实现中是通过一系列的模式匹配进行的,前面已经说过。
RpcEndpoint 的生命周期是:
- 调用构造器;
- 调用 OnStart() 方法;
- 调用 receive() 方法或者 receiveAndReply() 方法;
- 调用 OnStop() 方法。
# 2.3 RpcEndpointRef 源码
RpcEndpointRef 只有一个 实现类 NettyRpcEndpointRef。
RpcEndpointRef 源码如下:
private[spark] abstract class RpcEndpointRef(conf: SparkConf) extends Serializable with Logging {
// RPC 最大重新连接次数,默认为3,可以通过 spark.rpc.numRetries 配置
private[this] val maxRetries = RpcUtils.numRetries(conf)
// RPC 每次重新连接需要等待的毫秒数,默认为3,可以通过 spark.rpc.retry.wait 属性配置
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
// RPC 执行 ask 的默认超时时间,默认为
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
// 当前 RpcEndpointRef 对应的 RpcEndpoint 地址,虽然 用了 RpcAddress 包装
// 但本质上就是 spark://host:port
def address: RpcAddress
// 当前 RpcEndpointRef 对应的 RpcEndpoint 名称
def name: String
// 发送单向异步消息。即只负责发送,不关心对方是否收到,最多发送一次
def send(message: Any): Unit
// 向对应的 RpcEndpoint.receiveAndReply 发送消息并期望在指定的超时时间内收到回复
// 只发送一次,不会重试。精准一次。
def askAbortable[T: ClassTag](message: Any, timeout: RpcTimeout): AbortableRpcFuture[T] = {
throw new UnsupportedOperationException()
}
// 向对应的 RpcEndpoint.receiveAndReply 发送消息并期望在指定的超时时间内收到回复
// 只发送一次,不会重试。精准一次。
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
// 同上
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
// 向相应的 RpcEndpoint.receiveAndReply 发送消息,并在指定的超时时间内获取其结果,如果失败则抛出异常。
// 这是一个阻塞动作,可能会花费很多时间,所以不要在 RpcEndpoint 的消息循环中调用它。
// 返回的 T 为 RpcEndpoint 回复的消息
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
由于 RpcEndpointRef 的核心方法都是空实现,而且其只有一个实现类 NettyRpcEndpointRef,所以看下 NettyRpcEndpointRef 源码:
private[netty] class NettyRpcEndpointRef(
@transient private val conf: SparkConf,
private val endpointAddress: RpcEndpointAddress,
@transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) {
@transient @volatile var client: TransportClient = _
override def address: RpcAddress =
if (endpointAddress.rpcAddress != null) endpointAddress.rpcAddress else null
private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject()
nettyEnv = NettyRpcEnv.currentEnv.value
client = NettyRpcEnv.currentClient.value
}
private def writeObject(out: ObjectOutputStream): Unit = {
out.defaultWriteObject()
}
override def name: String = endpointAddress.name
override def askAbortable[T: ClassTag](
message: Any, timeout: RpcTimeout): AbortableRpcFuture[T] = {
nettyEnv.askAbortable(new RequestMessage(nettyEnv.address, this, message), timeout)
}
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
askAbortable(message, timeout).future
}
override def send(message: Any): Unit = {
require(message != null, "Message is null")
nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}
override def toString: String = s"NettyRpcEndpointRef(${endpointAddress})"
final override def equals(that: Any): Boolean = that match {
case other: NettyRpcEndpointRef => endpointAddress == other.endpointAddress
case _ => false
}
final override def hashCode(): Int =
if (endpointAddress == null) 0 else endpointAddress.hashCode()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
从上面的内容可以看到,NettyRpcEndpointRef 的send()
、ask()
和askAbortable()
实际上是对 NettyRpcEnv 的代理,消息的发送实际上都是由 NettyRpcEnv 提供的。
# 2.4 NettyRpcEnv 源码
两个实体之间通信都依赖于 RpcEnv 提供的基础能力,
RpcEndpoints 需要向 RpcEnv 注册一个名称来接收消息。然后 RpcEnv 会处理从 RpcEndpointRef 或其它远程节点发送过来的消息,并将它们投递到相应的 RpcEndpoints。对于 RpcEnv 捕获的未捕获异常,RpcEnv 将使用 RpcCallContext.sendFailure 将异常发送回发送者,或者使用 NotSerializableException 记录它们。RpcEnv 还提供了一些方法来检索给定名称或 uri 的 RpcEndpointRefs。
由于 RpcEnv 只有一个实现类 NettyRpcEnv,所以直接看 NettyRpcEnv 源码。
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
val role = conf.get(EXECUTOR_ID).map { id =>
if (id == SparkContext.DRIVER_IDENTIFIER) "driver" else "executor"
}
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
conf.clone.set(RPC_IO_NUM_CONNECTIONS_PER_PEER, 1),
"rpc",
conf.get(RPC_IO_THREADS).getOrElse(numUsableCores),
role)
// 分发器(消息分发层的实现)
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val streamManager = new NettyStreamManager(this)
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
private def createClientBootstraps(): java.util.List[TransportClientBootstrap] = {
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthClientBootstrap(transportConf,
securityManager.getSaslUser(), securityManager))
} else {
java.util.Collections.emptyList[TransportClientBootstrap]
}
}
private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
/**
* A separate client factory for file downloads. This avoids using the same RPC handler as
* the main RPC context, so that events caused by these clients are kept isolated from the
* main RPC traffic.
*
* It also allows for different configuration of certain properties, such as the number of
* connections per peer.
*/
@volatile private var fileDownloadFactory: TransportClientFactory = _
val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")
// Because TransportClientFactory.createClient is blocking, we need to run it in this thread pool
// to implement non-blocking send/ask.
// TODO: a non-blocking TransportClientFactory.createClient in future
private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
"netty-rpc-connection",
conf.get(RPC_CONNECT_THREADS))
@volatile private var server: TransportServer = _
private val stopped = new AtomicBoolean(false)
// 发件箱集合,维护了一组 RpcAddress 与 Outbox 的映射关系
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
// 移除 Outbox
private[netty] def removeOutbox(address: RpcAddress): Unit = {
val outbox = outboxes.remove(address)
if (outbox != null) {
outbox.stop()
}
}
// 启动 Server 服务
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
server = transportContext.createServer(bindAddress, port, bootstraps)
// RpcEndpointVerifier 用于校验指定名称的 RpcEndpoint 是否存在
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
@Nullable
override lazy val address: RpcAddress = {
if (server != null) RpcAddress(host, server.getPort()) else null
}
// 注册 RpcEndpoint
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
val addr = RpcEndpointAddress(uri)
val endpointRef = new NettyRpcEndpointRef(conf, addr, this)
val verifier = new NettyRpcEndpointRef(
conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this)
verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(endpointRef.name)).flatMap { find =>
if (find) {
Future.successful(endpointRef)
} else {
Future.failed(new RpcEndpointNotFoundException(uri))
}
}(ThreadUtils.sameThread)
}
override def stop(endpointRef: RpcEndpointRef): Unit = {
require(endpointRef.isInstanceOf[NettyRpcEndpointRef])
dispatcher.stop(endpointRef)
}
// 将消息投递到 NettyRpcEndpointRef 对应的 Ouxbox 中
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
if (receiver.client != null) {
message.sendWith(receiver.client)
} else {
require(receiver.address != null,
"Cannot send message to client endpoint with no listen address.")
val targetOutbox = {
val outbox = outboxes.get(receiver.address)
if (outbox == null) {
val newOutbox = new Outbox(this, receiver.address)
val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
if (oldOutbox == null) {
newOutbox
} else {
oldOutbox
}
} else {
outbox
}
}
if (stopped.get) {
// It's possible that we put `targetOutbox` after stopping. So we need to clean it.
outboxes.remove(receiver.address)
targetOutbox.stop()
} else {
targetOutbox.send(message)
}
}
}
// 发送消息
private[netty] def send(message: RequestMessage): Unit = {
val remoteAddr = message.receiver.address
if (remoteAddr == address) {
// 接受方和发送方都在统一个进程里,直接调用 dispatcher 发送消息,不需要走 RPC 网络传输
try {
dispatcher.postOneWayMessage(message)
} catch {
case e: RpcEnvStoppedException => logDebug(e.getMessage)
}
} else {
// 接受方和发送方不在统一个进程里
postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))
}
}
private[netty] def createClient(address: RpcAddress): TransportClient = {
clientFactory.createClient(address.host, address.port)
}
private[netty] def askAbortable[T: ClassTag](
message: RequestMessage, timeout: RpcTimeout): AbortableRpcFuture[T] = {
val promise = Promise[Any]()
val remoteAddr = message.receiver.address
var rpcMsg: Option[RpcOutboxMessage] = None
def onFailure(e: Throwable): Unit = {
if (!promise.tryFailure(e)) {
e match {
case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
case _ => logWarning(s"Ignored failure: $e")
}
}
}
def onSuccess(reply: Any): Unit = reply match {
case RpcFailure(e) => onFailure(e)
case rpcReply =>
if (!promise.trySuccess(rpcReply)) {
logWarning(s"Ignored message: $reply")
}
}
def onAbort(t: Throwable): Unit = {
onFailure(t)
rpcMsg.foreach(_.onAbort())
}
try {
if (remoteAddr == address) {
val p = Promise[Any]()
p.future.onComplete {
case Success(response) => onSuccess(response)
case Failure(e) => onFailure(e)
}(ThreadUtils.sameThread)
dispatcher.postLocalMessage(message, p)
} else {
val rpcMessage = RpcOutboxMessage(message.serialize(this),
onFailure,
(client, response) => onSuccess(deserialize[Any](client, response)))
rpcMsg = Option(rpcMessage)
postToOutbox(message.receiver, rpcMessage)
promise.future.failed.foreach {
case _: TimeoutException => rpcMessage.onTimeout()
case _ =>
}(ThreadUtils.sameThread)
}
val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
override def run(): Unit = {
onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +
s"in ${timeout.duration}"))
}
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
promise.future.onComplete { v =>
timeoutCancelable.cancel(true)
}(ThreadUtils.sameThread)
} catch {
case NonFatal(e) =>
onFailure(e)
}
new AbortableRpcFuture[T](
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread),
onAbort)
}
private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
askAbortable(message, timeout).future
}
private[netty] def serialize(content: Any): ByteBuffer = {
javaSerializerInstance.serialize(content)
}
/**
* Returns [[SerializationStream]] that forwards the serialized bytes to `out`.
*/
private[netty] def serializeStream(out: OutputStream): SerializationStream = {
javaSerializerInstance.serializeStream(out)
}
private[netty] def deserialize[T: ClassTag](client: TransportClient, bytes: ByteBuffer): T = {
NettyRpcEnv.currentClient.withValue(client) {
deserialize { () =>
javaSerializerInstance.deserialize[T](bytes)
}
}
}
override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.getRpcEndpointRef(endpoint)
}
override def shutdown(): Unit = {
cleanup()
}
override def awaitTermination(): Unit = {
dispatcher.awaitTermination()
}
private def cleanup(): Unit = {
if (!stopped.compareAndSet(false, true)) {
return
}
val iter = outboxes.values().iterator()
while (iter.hasNext()) {
val outbox = iter.next()
outboxes.remove(outbox.address)
outbox.stop()
}
if (timeoutScheduler != null) {
timeoutScheduler.shutdownNow()
}
if (dispatcher != null) {
dispatcher.stop()
}
if (server != null) {
server.close()
}
if (clientFactory != null) {
clientFactory.close()
}
if (clientConnectionExecutor != null) {
clientConnectionExecutor.shutdownNow()
}
if (fileDownloadFactory != null) {
fileDownloadFactory.close()
}
if (transportContext != null) {
transportContext.close()
}
}
override def deserialize[T](deserializationAction: () => T): T = {
NettyRpcEnv.currentEnv.withValue(this) {
deserializationAction()
}
}
override def fileServer: RpcEnvFileServer = streamManager
override def openChannel(uri: String): ReadableByteChannel = {
val parsedUri = new URI(uri)
require(parsedUri.getHost() != null, "Host name must be defined.")
require(parsedUri.getPort() > 0, "Port must be defined.")
require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")
val pipe = Pipe.open()
val source = new FileDownloadChannel(pipe.source())
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
val client = downloadClient(parsedUri.getHost(), parsedUri.getPort())
val callback = new FileDownloadCallback(pipe.sink(), source, client)
client.stream(parsedUri.getPath(), callback)
})(catchBlock = {
pipe.sink().close()
source.close()
})
source
}
private def downloadClient(host: String, port: Int): TransportClient = {
if (fileDownloadFactory == null) synchronized {
if (fileDownloadFactory == null) {
val module = "files"
val prefix = "spark.rpc.io."
val clone = conf.clone()
// Copy any RPC configuration that is not overridden in the spark.files namespace.
conf.getAll.foreach { case (key, value) =>
if (key.startsWith(prefix)) {
val opt = key.substring(prefix.length())
clone.setIfMissing(s"spark.$module.io.$opt", value)
}
}
val ioThreads = clone.getInt("spark.files.io.threads", 1)
val downloadConf = SparkTransportConf.fromSparkConf(clone, module, ioThreads)
val downloadContext = new TransportContext(downloadConf, new NoOpRpcHandler(), true)
fileDownloadFactory = downloadContext.createClientFactory(createClientBootstraps())
}
}
fileDownloadFactory.createClient(host, port)
}
private class FileDownloadChannel(source: Pipe.SourceChannel) extends ReadableByteChannel {
@volatile private var error: Throwable = _
def setError(e: Throwable): Unit = {
// This setError callback is invoked by internal RPC threads in order to propagate remote
// exceptions to application-level threads which are reading from this channel. When an
// RPC error occurs, the RPC system will call setError() and then will close the
// Pipe.SinkChannel corresponding to the other end of the `source` pipe. Closing of the pipe
// sink will cause `source.read()` operations to return EOF, unblocking the application-level
// reading thread. Thus there is no need to actually call `source.close()` here in the
// onError() callback and, in fact, calling it here would be dangerous because the close()
// would be asynchronous with respect to the read() call and could trigger race-conditions
// that lead to data corruption. See the PR for SPARK-22982 for more details on this topic.
error = e
}
override def read(dst: ByteBuffer): Int = {
Try(source.read(dst)) match {
// See the documentation above in setError(): if an RPC error has occurred then setError()
// will be called to propagate the RPC error and then `source`'s corresponding
// Pipe.SinkChannel will be closed, unblocking this read. In that case, we want to propagate
// the remote RPC exception (and not any exceptions triggered by the pipe close, such as
// ChannelClosedException), hence this `error != null` check:
case _ if error != null => throw error
case Success(bytesRead) => bytesRead
case Failure(readErr) => throw readErr
}
}
override def close(): Unit = source.close()
override def isOpen(): Boolean = source.isOpen()
}
private class FileDownloadCallback(
sink: WritableByteChannel,
source: FileDownloadChannel,
client: TransportClient) extends StreamCallback {
override def onData(streamId: String, buf: ByteBuffer): Unit = {
while (buf.remaining() > 0) {
sink.write(buf)
}
}
override def onComplete(streamId: String): Unit = {
sink.close()
}
override def onFailure(streamId: String, cause: Throwable): Unit = {
logDebug(s"Error downloading stream $streamId.", cause)
source.setError(cause)
sink.close()
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# 2.5 Dispatcher 源码
Dispatcher 被称作消息调度器,或者消息分发器,主要负责将来自不同 RpcEndpointRef 的消息分发到对应的 RpcEndpoint,类似与现实生活中的邮局。Dispatcher 在 NettyRpcEnv 中被创建,代码如下:
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
Dispatcher 源码如下:
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
// RpcEndpoint 名称与 MessageLoop 之间的映射关系
private val endpoints: ConcurrentMap[String, MessageLoop] = new ConcurrentHashMap[String, MessageLoop]
// 端点实例 RpcEndpoint 与端点引用 RpcEndpointRef 之间的映射关系
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
private val shutdownLatch = new CountDownLatch(1)
private lazy val sharedLoop = new SharedMessageLoop(nettyEnv.conf, this, numUsableCores)
// Dispatcher 停止时为 true,一旦 Dispatcher 停止,所有发出去的消息都会被立即退回
@GuardedBy("this")
private var stopped = false
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.containsKey(name)) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
// This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
// active when registering, and endpointRef must be put into endpointRefs before onStart is
// called.
endpointRefs.put(endpoint, endpointRef)
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
new DedicatedMessageLoop(name, e, this)
case _ =>
sharedLoop.register(name, endpoint)
sharedLoop
}
endpoints.put(name, messageLoop)
} catch {
case NonFatal(e) =>
endpointRefs.remove(endpoint)
throw e
}
}
endpointRef
}
// 根据 RpcEndpoint 获取其对应的 RpcEndpointRef
def getRpcEndpointRef(endpoint: RpcEndpoint): RpcEndpointRef = endpointRefs.get(endpoint)
// 根据 RpcEndpoint 删除 RpcEndpoint 与 RpcEndpointRef 的映射缓存
def removeRpcEndpointRef(endpoint: RpcEndpoint): Unit = endpointRefs.remove(endpoint)
// 幂等的,多次执行的结果是一样的
private def unregisterRpcEndpoint(name: String): Unit = {
val loop = endpoints.remove(name)
if (loop != null) {
loop.unregister(name)
}
// Don't clean `endpointRefs` here because it's possible that some messages are being processed
// now and they can use `getRpcEndpointRef`. So `endpointRefs` will be cleaned in Inbox via
// `removeRpcEndpointRef`.
}
// 停止具体某个 RpcEndpoint
def stop(rpcEndpointRef: RpcEndpointRef): Unit = {
synchronized {
if (stopped) {
// This endpoint will be stopped by Dispatcher.stop() method.
return
}
unregisterRpcEndpoint(rpcEndpointRef.name)
}
}
// 向当前进程中所有注册过的 RpcEndpoint 发送消息
def postToAll(message: InboxMessage): Unit = {
val iter = endpoints.keySet().iterator()
while (iter.hasNext) {
val name = iter.next
postMessage(name, message, (e) => { e match {
case e: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${e.getMessage}")
case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}")
}}
)}
}
// 发布由 Remote RpcEndpoint 发送的消息
def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
val rpcCallContext =
new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
}
// 发布由 Local RpcEndpoint 发送的消息
def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
val rpcCallContext =
new LocalNettyRpcCallContext(message.senderAddress, p)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))
}
// 发布单向消息,不需要回复
def postOneWayMessage(message: RequestMessage): Unit = {
postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),
(e) => throw e)
}
/**
* 向指定端点发送消息
*
* @param endpointName 端点名称
* @param message 需要发送的消息
* @param callbackIfStopped 如果端点已经停止,回调此函数
*/
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
// 根据端点名称获取对应的消息循环
val loop = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (loop == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
// 如果当前 Dispatcher 不是停止状态
loop.post(endpointName, message)
None
}
}
// 最后调用回调函数
error.foreach(callbackIfStopped)
}
// 停止 Dispatcher
def stop(): Unit = {
synchronized {
if (stopped) {
return
}
stopped = true
}
var stopSharedLoop = false
// 对所有 RpcEndpoint 执行取消注册,并关闭 MessageLoop
endpoints.asScala.foreach { case (name, loop) =>
unregisterRpcEndpoint(name)
if (!loop.isInstanceOf[SharedMessageLoop]) {
loop.stop()
} else {
stopSharedLoop = true
}
}
if (stopSharedLoop) {
sharedLoop.stop()
}
shutdownLatch.countDown()
}
def awaitTermination(): Unit = {
shutdownLatch.await()
}
/**
* Return if the endpoint exists
*/
def verify(name: String): Boolean = {
endpoints.containsKey(name)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
Dispatcher 扮演的是类似于邮局的角色,主要负责将来自各个 RpcEndpointRef 的消息分发到对应的 RpcEndpoint。Dispatcher 可以概括如下:
- endpoints 变量维护了一组 RpcEndpoint 与 MessageLoop 的对应关系。
- endpointRefs 变量维护了一组 RpcEndpoint 与 RpcEndpointRef 之间的对应关系。
- registerRpcEndpoint 方法提供了注册 RpcEndpoint 的能力,具体就是会负责添加 endpoints 与 endpointRefs 中的信息,并调通 MessageLoop 的 register 方法。
- unregisterRpcEndpoint 方法提供了 RpcEndpoint 的去注册能力,实际上也是调用的 MessageLoop 的 unregister 方法。
- 最重要的是有一组 post* 方法(postToAll、postRemoteMessage、postLocalMessage、postOneWayMessage),负责将具体的消息发送到对应的 RpcEndpoint,实际上都是对 postMessage 的包装,而 postMessage 也是通过调用 MessageLoop 的 post 方法执行的发送,postMessage 方法只是多了回调的操作。
- Dispatcher 中的分发能力实际上是依赖于 MessageLoop 实现的,它的注册 RpcEndpoint 以及发送消息都是对 MessageLoop 的包装,真正启动线程池执行发送逻辑的是 MessageLoop。
# 2.6 MessageLoop 源码
MessageLoop 是一个抽象类,源码里没有太多介绍,从功能上看就和名字一样,通过循环不断处理消息。MessageLoop 有两个实现类:SharedMessageLoop 和 DedicatedMessageLoop。它们两个的区别是前者可以通过共享线程池为多个 RpcEndpoint 提供消息分发能力,后者通过单一线程为单个 RpcEndpoint 提供消息分发能力。
MessageLoop 源码:
private sealed abstract class MessageLoop(dispatcher: Dispatcher) extends Logging {
// List of inboxes with pending messages, to be processed by the message loop.
private val active = new LinkedBlockingQueue[Inbox]()
// Message loop task; should be run in all threads of the message loop's pool.
protected val receiveLoopRunnable = new Runnable() {
override def run(): Unit = receiveLoop()
}
protected val threadpool: ExecutorService
private var stopped = false
def post(endpointName: String, message: InboxMessage): Unit
def unregister(name: String): Unit
def stop(): Unit = {
synchronized {
if (!stopped) {
setActive(MessageLoop.PoisonPill)
threadpool.shutdown()
stopped = true
}
}
threadpool.awaitTermination(Long.MaxValue, TimeUnit.MILLISECONDS)
}
protected final def setActive(inbox: Inbox): Unit = active.offer(inbox)
private def receiveLoop(): Unit = {
try {
while (true) {
try {
val inbox = active.take()
if (inbox == MessageLoop.PoisonPill) {
// Put PoisonPill back so that other threads can see it.
setActive(MessageLoop.PoisonPill)
return
}
inbox.process(dispatcher)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case _: InterruptedException => // exit
case t: Throwable =>
try {
// Re-submit a receive task so that message delivery will still work if
// UncaughtExceptionHandler decides to not kill JVM.
threadpool.execute(receiveLoopRunnable)
} finally {
throw t
}
}
}
}
private object MessageLoop {
/** A poison inbox that indicates the message loop should stop processing messages. */
val PoisonPill = new Inbox(null, null)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
SharedMessageLoop 源码:
private class SharedMessageLoop(
conf: SparkConf,
dispatcher: Dispatcher,
numUsableCores: Int)
extends MessageLoop(dispatcher) {
private val endpoints = new ConcurrentHashMap[String, Inbox]()
private def getNumOfThreads(conf: SparkConf): Int = {
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
val modNumThreads = conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
.getOrElse(math.max(2, availableCores))
conf.get(EXECUTOR_ID).map { id =>
val role = if (id == SparkContext.DRIVER_IDENTIFIER) "driver" else "executor"
conf.getInt(s"spark.$role.rpc.netty.dispatcher.numThreads", modNumThreads)
}.getOrElse(modNumThreads)
}
/** Thread pool used for dispatching messages. */
override protected val threadpool: ThreadPoolExecutor = {
val numThreads = getNumOfThreads(conf)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(receiveLoopRunnable)
}
pool
}
override def post(endpointName: String, message: InboxMessage): Unit = {
val inbox = endpoints.get(endpointName)
inbox.post(message)
setActive(inbox)
}
override def unregister(name: String): Unit = {
val inbox = endpoints.remove(name)
if (inbox != null) {
inbox.stop()
// Mark active to handle the OnStop message.
setActive(inbox)
}
}
def register(name: String, endpoint: RpcEndpoint): Unit = {
val inbox = new Inbox(name, endpoint)
endpoints.put(name, inbox)
// Mark active to handle the OnStart message.
setActive(inbox)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
DedicatedMessageLoop 源码:
private class DedicatedMessageLoop(
name: String,
endpoint: IsolatedRpcEndpoint,
dispatcher: Dispatcher)
extends MessageLoop(dispatcher) {
private val inbox = new Inbox(name, endpoint)
override protected val threadpool = if (endpoint.threadCount() > 1) {
ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())
} else {
ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")
}
(1 to endpoint.threadCount()).foreach { _ =>
threadpool.submit(receiveLoopRunnable)
}
// Mark active to handle the OnStart message.
setActive(inbox)
override def post(endpointName: String, message: InboxMessage): Unit = {
require(endpointName == name)
inbox.post(message)
setActive(inbox)
}
override def unregister(endpointName: String): Unit = synchronized {
require(endpointName == name)
inbox.stop()
// Mark active to handle the OnStop message.
setActive(inbox)
setActive(MessageLoop.PoisonPill)
threadpool.shutdown()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36