概览
Spark程序在运行的过程中,Driver端的很多功能都依赖于事件的传递和处理,而事件总线在这中间发挥着至关重要的纽带作用。事件总线通过异步线程,提高了Driver执行的效率。
Spark定义了一个特质[1]ListenerBus,可以接收事件并且将事件提交到对应事件的监听器。为了对ListenerBus有个直观的理解,我们先来看看它的代码实现,见代码清单1。
代码清单1 ListenerBus的定义
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { private[spark] val listeners = new CopyOnWriteArrayList[L] final def addListener(listener: L): Unit = { listeners.add(listener) } final def removeListener(listener: L): Unit = { listeners.remove(listener) } final def postToAll(event: E): Unit = { val iter = listeners.iterator while (iter.hasNext) { val listener = iter.next() try { doPostEvent(listener, event) } catch { case NonFatal(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) } } } protected def doPostEvent(listener: L, event: E): Unit private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = { val c = implicitly[ClassTag[T]].runtimeClass listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq } }
代码清单1中展示了ListenerBus是个泛型特质,其泛型参数为 [L <: AnyRef, E],其中L是代表监听器的泛型参数,可以看到ListenerBus支持任何类型的监听器,E是代表事件的泛型参数。ListenerBus中各个成员的作用如下:
- listeners:用于维护所有注册的监听器,其数据结构为CopyOnWriteArrayList[L];
- addListener:向listeners中添加监听器的方法,由于listeners采用CopyOnWriteArrayList来实现,所以addListener方法是线程安全的;
- removeListener:从listeners中移除监听器的方法,由于listeners采用CopyOnWriteArrayList来实现,所以removeListener方法是线程安全的;
- postToAll:此方法的作用是将事件投递给所有的监听器。虽然CopyOnWriteArrayList本身是线程的安全的,但是由于postToAll方法内部引入了“先检查后执行”的逻辑,因而postToAll方法不是线程安全的,所以所有对postToAll方法的调用应当保证在同一个线程中;
- doPostEvent:用于将事件投递给指定的监听器,此方法只提供了接口定义,具体实现需要子类提供;
- findListenersByClass:查找与指定类型相同的监听器列表。
下面将分别对以下内容进行介绍:
- ListenerBus的继承体系
- SparkListenerBus详解
- LiveListenerBus详解
[1] 特质是Scala语言中提供真正的多重继承的语法特性,类似于Java的Interface,但是又可以实现方法。有关Scala特质的更多介绍请访问Scala官网http://www.scala-lang.org。
ListenerBus的继承体系
理解了ListenerBus的定义后,本小节一起来看看有哪些类继承了它。ListenerBus的类继承体系如图1所示。

图1 ListenerBus的类继承体系
从图1中可以看到有三种ListenerBus的具体实现,分别为:
- SparkListenerBus:用于将SparkListenerEvent类型的事件投递到SparkListenerInterface类型的监听器;
- StreamingQueryListenerBus:用于将StreamingQueryListener.Event类型的事件投递到StreamingQueryListener类型的监听器,此外还会将StreamingQueryListener.Event类型的事件交给SparkListenerBus;
- StreamingListenerBus:用于将StreamingListenerEvent类型的事件投递到StreamingListener类型的监听器,此外还会将StreamingListenerEvent类型的事件交给SparkListenerBus。
SparkListenerBus也有两种实现:
- LiveListenerBus:采用异步线程将SparkListenerEvent类型的事件投递到SparkListener类型的监听器;
- ReplayListenerBus:用于从序列化的事件数据中重播事件。
有了对事件总线的这些介绍,读者已经在宏观上对其有所认识。但是如果没有具体的实现,ListenerBus本身也无法发挥作用。下一小节我们将选择对SparkListenerBus从更加微观的角度说明如何使用事件总线。
SparkListenerBus详解
有了上一节对ListenerBus类继承体系的介绍,本小节将详细介绍SparkListenerBus的实现,见代码清单2。
代码清单2 SparkListenerBus的实现
private[spark] trait SparkListenerBus extends ListenerBus[SparkListenerInterface, SparkListenerEvent] { protected override def doPostEvent( listener: SparkListenerInterface, event: SparkListenerEvent): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => listener.onStageSubmitted(stageSubmitted)

