Spark Streaming源码初探 (3)

时间:2021-11-29 23:49:19

本节分析一下Spark Streaming生成RDD的过程(也是生成Job的过程),DStream是Spark Streaming的抽象数据表示,底层是RDD实现。由于RDD是为了满足Job需要的,所以触发生成RDD的职责应该是由JobGenerator负责。换句话说:RDD的生成是在Job生成过程中生成的,所以查看Job生成过程也就是RDD的生成过程。


 //TODO 生成Job的定时器,按照间隔定期执行
  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 
						longTime => Time(longTime))), "JobGenerator")


  /** Start generation of jobs */
  def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125

    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      //TODO   onReceive方法在EventLoop的线程的run方法中被调用
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
    eventLoop.start()//TODO 启动事件循环处理器

    if (ssc.isCheckpointPresent) {
    } else {
      //TODO 启动定时生成job的timer启动


  /** Starts the generator for the first time */
  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    logInfo("Started JobGenerator at " + startTime)

通过上面timer的启动便完成了 Time(longTime)))的定时执行。 接下来就看一下事件循环处理器的实现:


  * An event loop to receive events from the caller and process all events in the event thread. It
  * will start an exclusive event thread to process all events.
  * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
  * handle events in time to avoid the potential OOM.
  * <br><br>
  * 内部一个线程负责处理消息
private[spark] abstract class EventLoop[E](name: String) extends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            //TODO 一旦有时间,则调用onReceive进行消息处理
          } catch {
            case NonFatal(e) =>
              try {
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)


  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    // Call onStart before starting the event thread to make sure it happens before onReceive

  def stop(): Unit = {
    if (stopped.compareAndSet(false, true)) {
      var onStopCalled = false
      try {
        // Call onStop after the event thread exits to make sure onReceive happens before onStop
        onStopCalled = true
      } catch {
        case ie: InterruptedException =>
          if (!onStopCalled) {
            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
            // it's already called.
    } else {
      // Keep quiet to allow calling `stop` multiple times.

    * Put the event into the event queue. The event thread will process it later.
  def post(event: E): Unit = {

    * Return if the event thread has already been started but not yet stopped.
  def isActive: Boolean = eventThread.isAlive

    * Invoked when `start()` is called but before the event thread starts.
  protected def onStart(): Unit = {}

    * Invoked when `stop()` is called and the event thread exits.
  protected def onStop(): Unit = {}

    * Invoked in the event thread when polling events from the event queue.
    * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
    * and cannot process events in time. If you want to call some blocking actions, run them in
    * another thread.
  protected def onReceive(event: E): Unit

    * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
    * will be ignored.
  protected def onError(e: Throwable): Unit



eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
  //TODO   onReceive方法在EventLoop的线程的run方法中被调用
  override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

  override protected def onError(e: Throwable): Unit = {
    jobScheduler.reportError("Error in job generator", e)


  /** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
        //TODO 处理生成Job时间
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)


/** Generate jobs and perform checkpointing for the given `time`.
  private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      //TODO 根据DStreamGraph生成Job集合
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        //TODO 获取到输入数据源的信息
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        //TODO 提交作业
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    //TODO 每一次generateJobs之后便执行CheckPoint, clearCheckpointDataLater = false))

    * 产生Job
    * @param time
    * @return
  def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>

        //TODO 调用了DStream的generateJob方法(org.apache.spark.streaming.dstream.DStream.generateJob方法中调用getOrCompute调用compute方法触发生成DStream)
        val jobOption = outputStream.generateJob(time)

    logDebug("Generated " + jobs.length + " jobs for time " + time)


    * Generate a SparkStreaming job for the given time. This is an internal method that
    * should not be called directly. This default implementation creates a job
    * that materializes the corresponding RDD. Subclasses of DStream may override this
    * to generate their own jobs.
  private[streaming] def generateJob(time: Time): Option[Job] = {
    //TODO 获取到当前时间对应的RDD
    getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          //TODO 和Spark RDD运行一样,最终都是使用sparkContext.runJob方法运行
          context.sparkContext.runJob(rdd, emptyFunc)
        Some(new Job(time, jobFunc))

      case None => None


    * Get the RDD corresponding to the given time; either retrieve it from cache or compute-and-cache it.
    * <br><br>
    * 获取当前时间time对应的RDD
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            //TODO 重点方法

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            logDebug(s"Persisting RDD ${} for time $time to $storageLevel")
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            logInfo(s"Marking RDD ${} for time $time for checkpointing")
          generatedRDDs.put(time, newRDD)
      } else {


    * Method that generates an RDD for the given time
    * @param validTime
    * @return  Option[KafkaRDD[K, V]]
  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {

    val untilOffsets = clamp(latestOffsets()) //TODO 重点业务,其中包含消息区间的确定和速率的控制
    // OffsetRange包含信息有:topic,partition,起始位置,结束位置
    val offsetRanges = { case (tp, uo) =>
      val fo = currentOffsets(tp)// fo和uo是多数情况相等的
      OffsetRange(tp.topic, tp.partition, fo, uo)

    //TODO KafkaRDD构造函数的第三个参数比较重要:该参数定义了Kafka分区属于当前RDD数据的offset值
    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    //TODO 汇报当前记录数目和元数据信息到InputInfoTracker
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset//TODO 过滤掉区间为空的offsetRange
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    //TODO InputInfoTracker是运行在Driver端,负责计算数据的监控
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    currentOffsets = untilOffsets
