Spark UI详解

分类: Spark源码 |
在大型分布式系统中,采用事件监听机制是最常见的。如果Spark UI采用Scala的函数调用方式,由于函数调用多数情况下是同步调用,导致线程被阻塞。将函数调用更换为发送事件,事件的处理时异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。
http://s7/mw690/002RSgYjgy71QWaDpI2b6&690UI详解" TITLE="Spark
DAGScheduler是主要的产生各类SparkListenerEvent的源头,它将各种SparkListenerEvent发送到ListenerBus的事件队列中,ListenerBus通过定时器将SparkListenerEvent事件匹配到具体的SparkListener,改变SparkListener中的统计监控数据,最终由SparkUI的界面展示。
listenerBus详解
listenerBus的类型是LiveListenerBus。LiveListenerBus实现了监听器模型,通过监听事件触发对各种监听器监听状态信息的修改,达到UI界面的数据刷新效果。LiveListenerBus继承自AsynchronousListenerBus,AsynchronousListenerBus由如下部分组成:
·事件阻塞队列:LinkedBlockingQueue[SparkListenerEvent],固定大小是10000;
·监听器数组:类型为ArrayBuffer[SparkListener],存放各类监听器SparkListener;
·事件匹配监听器的线程:此Thread不断拉取LinkedBlockingQueue中的事件,遍历监听器,调用监听器的方法。任何事件都会在LinkedBlockingQueue中存在一段时间,然后Thread处理了此事件后,会将其清除。
构造JobProgressListener
以JobProgressListener为例讲解SparkListener。通过监听ListenerBus中的事件更新任务进度。SparkStatusTracker和SparkUI实际上也是通过JobProgressListener来实现任务状态跟踪的。
JobProgressListener实现了onJobStart、onJobEnd、onStageCompleted、onStageSubmitted、onTaskStart、onTaskEnd等方法,这些方法是在listenerBus的驱动下,改变JobProgressListener中的各种Job、Stage相关的数据。
SparkUI的创建与初始化
private def
create(
}
在create方法里除了JobprogressListener是外部传入的之外,又增加了一些SparkListener。例如用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展示在ExecutorTab的ExecutorListener;用于准备将Executor相关存储信息展示在BlockManagerUI的storageListener等。最后,创建SparkUI,SparkUI服务默认是可以杀掉 的,设置spark.ui.killEnabled为false可以保证不被杀死。
Initialize方法会组织前端页面各个Tab和Page的展示及布局。
SparkUI的页面布局与展示
JobsTab展示所有Job的进度,状态信息,以它为例说明。JobsTab会复用SparkUI的killEnabled,SparkContext,jobProgressListener,包括AllJobsPage和JobPage两个页面。
private[ui]
class
JobsTab(parent: SparkUI)
extends
SparkUITab(parent,
"jobs")
{
}
AllJobsPage由render方法渲染,利用JobProgressListener中的统计监控数据生成激活、完成、失败等状态的Job摘要信息,并调用jobsTable方法生成表格等html元素,最终使用UIUtils的headerSparkPage封装好css、js、header及页面布局等。
上面的attachPage方法存在于JobsTab的父类WebUITab中,WebUITab维护有ArrayBuffer[WebUIPage]的数据结构,AllJobsPage和JobPage将被放入此ArrayBuffer中。
private[spark]
abstract class
WebUITab(parent: WebUI,
val
prefix:
String)
{
}
JobsTab创建之后,被attachTab方法加入SparkUI的ArrayBuffer[WebUITab]中,并且通过attachPage方法,将每个page生成org.apache.jetty.servlet.ServletContextHandler,最后调用attachHandler方法将ServletContextHandler绑定到SparkUI。
def
attachTab(tab:
WebUITab) {
}
def
attachPage(page:
WebUIPage) {
}
def
attachHandler(handler:
ServletContextHandler) {
}
SparkUI的启动
SparkUI创建好后,需要调用父类WebUI的bind方法,绑定服务和端口。
def
bind()
{
}
startJettyServer是JettyUtils的静态方法,最终启动Jetty提供的服务。