Spark Streaming源码初探 (2)

时间:2023-02-15 20:47:17

 Spark Streaming源码初探 (1) 讲解基于Receiver方式创建DStream和简单分析StreamingContext的启动函数,本节将继续上一节的内容,主要从StreamingContext#start方法中的jobScheduler.start()开始。

简单回顾一下StreamingContext#start源码:

  /**
* Start the execution of the streams.
*
* @throws IllegalStateException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()

// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
//TODO JobScheduler启动,重点的入口
scheduler.start()
}

... //省略部分代码
}
}

此段代码重点:JobScheduler的启动,在JobScheduler的start方法中完成一些必要初始化工作,具体见如下!


接下来就查看org.apache.spark.streaming.scheduler.JobScheduler#start代码:

 /**
* org.apache.spark.streaming.scheduler.#start()
*/
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
//TODO JobScheduler的重点方法,负责job的启动和异常信息处理
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)

listenerBus.start()
//TODO ReceiverTracker初始化
receiverTracker = new ReceiverTracker(ssc)
//TODO 主要用来跟踪输入数据集的统计信息,并进行UI显示和监控
inputInfoTracker = new InputInfoTracker(ssc)

val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
case _ => null
}

executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
clock)
executorAllocationManager.foreach(ssc.addStreamingListener)

//TODO 重点:ReceiverTracker启动(完成ReceiverSupvisor的创建一起Receiver的启动)
receiverTracker.start() //start方法中会判断是否为Receiver类型计算源而具体判断是否启动Receiver

//TODO jobGenerator启动
jobGenerator.start()

executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
} //TODO JobScheduler的start方法定义结束
此段代码的重点如下:

1:启动一个时间循环处理器,该事件循环处理器负责job的启动的信息处理(主要负责UI更新),并没有处理Job启动事件,Job真正启动是job直接调用run进行启动执行      job。该事件循环处理器还负责job完成信息和异常信息处理。

2:ReceiverTracker创建和启动,如果是基于Receiver的计算源的话,ReceiverTracker主要负责在Executor上启动ReceiverSupvisor和Receiver。如果不是基于Receiver的       计算源的话,同样也会启动ReceiverTracker,但是此处不会在Executor上启动ReceiverSupvisor和Receiver(org.apache.spark.streaming.DStreamGraph#getReceiverInputStreams是否为空实现是否启动Recevier)

3:inputInfoTracker创建,负责Driver端输入数据源的跟踪。下图是三者之间一个关系,StreamingContext持有JobScheduler引用,JobScheduler持有InputInfoTracker引     用

Spark Streaming源码初探 (2)

每一次在ReceiverInputDStream(基于Receiver模式)的compute生成RDD时候都会向inputInfoTracker进行汇报RDD信息,如下是ReceiverInputDStream的compute方法:

  /**
* Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {

if (validTime < graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
//TODO receiverTracker是driver端的receiver跟踪器
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)//向inputInfoTracker进行汇报输入源信息

// Create the BlockRDD
createBlockRDD(validTime, blockInfos)
}
}
Some(blockRDD)
}

4:启动jobGenerator,J obGenerator持有JobScheduler的引用:

Spark Streaming源码初探 (2)

因此在 org.apache.spark.streaming.scheduler.JobGenerator#generateJobs产生job时候很容易通过 jobScheduler.inputInfoTracker.getInfo(time)方法直接得到输入源的信息。