Spark2.1.0——深入理解事件总线

 

概览

  Spark程序在运行的过程中,Driver端的很多功能都依赖于事件的传递和处理,而事件总线在这中间发挥着至关重要的纽带作用。事件总线通过异步线程,提高了Driver执行的效率。

       Spark定义了一个特质[1]ListenerBus,可以接收事件并且将事件提交到对应事件的监听器。为了对ListenerBus有个直观的理解,我们先来看看它的代码实现,见代码清单1。

代码清单1        ListenerBus的定义

复制代码
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {    private[spark] val listeners = new CopyOnWriteArrayList[L]    final def addListener(listener: L): Unit = {     listeners.add(listener)   }    final def removeListener(listener: L): Unit = {     listeners.remove(listener)   }    final def postToAll(event: E): Unit = {     val iter = listeners.iterator     while (iter.hasNext) {       val listener = iter.next()       try {         doPostEvent(listener, event)       } catch {         case NonFatal(e) =>           logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)       }     }   }    protected def doPostEvent(listener: L, event: E): Unit    private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {     val c = implicitly[ClassTag[T]].runtimeClass     listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq   }  }
复制代码

代码清单1中展示了ListenerBus是个泛型特质,其泛型参数为 [L <: AnyRef, E],其中L是代表监听器的泛型参数,可以看到ListenerBus支持任何类型的监听器,E是代表事件的泛型参数。ListenerBus中各个成员的作用如下:

  • listeners:用于维护所有注册的监听器,其数据结构为CopyOnWriteArrayList[L];
  • addListener:向listeners中添加监听器的方法,由于listeners采用CopyOnWriteArrayList来实现,所以addListener方法是线程安全的;
  • removeListener:从listeners中移除监听器的方法,由于listeners采用CopyOnWriteArrayList来实现,所以removeListener方法是线程安全的;
  • postToAll:此方法的作用是将事件投递给所有的监听器。虽然CopyOnWriteArrayList本身是线程的安全的,但是由于postToAll方法内部引入了“先检查后执行”的逻辑,因而postToAll方法不是线程安全的,所以所有对postToAll方法的调用应当保证在同一个线程中;
  • doPostEvent:用于将事件投递给指定的监听器,此方法只提供了接口定义,具体实现需要子类提供;
  • findListenersByClass:查找与指定类型相同的监听器列表。

下面将分别对以下内容进行介绍:

  1. ListenerBus的继承体系
  2. SparkListenerBus详解
  3. LiveListenerBus详解

[1] 特质是Scala语言中提供真正的多重继承的语法特性,类似于Java的Interface,但是又可以实现方法。有关Scala特质的更多介绍请访问Scala官网http://www.scala-lang.org。

ListenerBus的继承体系

理解了ListenerBus的定义后,本小节一起来看看有哪些类继承了它。ListenerBus的类继承体系如图1所示。

图1  ListenerBus的类继承体系

从图1中可以看到有三种ListenerBus的具体实现,分别为:

  • SparkListenerBus:用于将SparkListenerEvent类型的事件投递到SparkListenerInterface类型的监听器;
  • StreamingQueryListenerBus:用于将StreamingQueryListener.Event类型的事件投递到StreamingQueryListener类型的监听器,此外还会将StreamingQueryListener.Event类型的事件交给SparkListenerBus;
  • StreamingListenerBus:用于将StreamingListenerEvent类型的事件投递到StreamingListener类型的监听器,此外还会将StreamingListenerEvent类型的事件交给SparkListenerBus。

SparkListenerBus也有两种实现:

  • LiveListenerBus:采用异步线程将SparkListenerEvent类型的事件投递到SparkListener类型的监听器;
  • ReplayListenerBus:用于从序列化的事件数据中重播事件。

有了对事件总线的这些介绍,读者已经在宏观上对其有所认识。但是如果没有具体的实现,ListenerBus本身也无法发挥作用。下一小节我们将选择对SparkListenerBus从更加微观的角度说明如何使用事件总线。

SparkListenerBus详解

  有了上一节对ListenerBus类继承体系的介绍,本小节将详细介绍SparkListenerBus的实现,见代码清单2。

代码清单2         SparkListenerBus的实现

复制代码
private[spark] trait SparkListenerBus   extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {    protected override def doPostEvent(       listener: SparkListenerInterface,       event: SparkListenerEvent): Unit = {     event match {       case stageSubmitted: SparkListenerStageSubmitted =>         listener.onStageSubmitted(stageSubmitted)       

                    
                
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信