Spark事件监听总线流程分析-Spark商业环境实战

作者:jcmp      发布时间:2021-05-05      浏览量:0
Spark商业环境实战及调优进阶系列1.

Spark商业环境实战及调优进阶系列

1. Spark事件监听总线流程分析

1.1 Spark事件监听总线流程分析

通过在SparkContext初始化时把日志监听器EventLoggingListener注册到LiveListenerBus事件总线上,并启动LiveListenerBus内部的Thread线程,监听提交到总线上的事件,调用SparkListenerBus的eventQueue.poll -->postToAll(event) --> doPostEvent方法,并进行事件匹配后处理,如:EventLoggingListener执行StageSubmited提交。

1.2 Spark UI 事件监听总线流程分析

Spark UI的可视化展示,是有不同的监听器实现的,他们都分别注册在LiveListenerBus上,如下面SparkContext的初始化片段:

if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI None }

下面片段展示的是SparkUI.createLiveUI方法,可以看到监听器的注册,通过事件的投递(如:DAGScheduler ,DriverEndpoint等),从而实现UI的数据展示:

val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) listenerBus.addListener(listener) listener}val environmentListener = new EnvironmentListenerval storageStatusListener = new StorageStatusListener(conf)val executorsListener = new ExecutorsListener(storageStatusListener, conf)val storageListener = new StorageListener(storageStatusListener)val operationGraphListener = new RDDOperationGraphListener(conf)listenerBus.addListener(environmentListener)listenerBus.addListener(storageStatusListener)listenerBus.addListener(executorsListener)listenerBus.addListener(storageListener)listenerBus.addListener(operationGraphListener)。

3 结语

秦凯新 于深圳 2018-10-28