第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

时间:2022-07-22 20:48:38

  DStream是RDD的模板,每隔一个batchInterval会根据DStream模板生成一个对应的RDD。然后将RDD存储到DStream中的generatedRDDs数据结构中:

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()


下面我们以一个实际的例子来说明DStream中RDD的生成的全周期。

val lines = ssc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))                    val pairs = words.map(word => (word, 1))           val wordCounts = pairs.reduceByKey(_ + _)        wordCounts.print()

代码可以‘翻译’成如下代码:

val lines = new SocketInputDStream("localhost", 9999)   // 类型是 SocketInputDStreamval words = new FlatMappedDStream(lines, _.split(" "))  // 类型是 FlatMappedDStreamval pairs = new MappedDStream(words, word => (word, 1)) // 类型是 MappedDStreamval wordCounts = new ShuffledDStream(pairs, _ + _)      // 类型是 ShuffledDStreamnew ForeachDStream(wordCounts, cnt => cnt.print())      // 类型是 ForeachDStream


我们先看看DStream的print方法:

def print(num: Int): Unit = ssc.withScope {  def foreachFunc: (RDD[T], Time) => Unit = {    (rdd: RDD[T], time: Time) => {      val firstNum = rdd.take(num + 1)      // scalastyle:off println      println("-------------------------------------------")      println("Time: " + time)      println("-------------------------------------------")      firstNum.take(num).foreach(println)      if (firstNum.length > num) println("...")      println()      // scalastyle:on println    }  }  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)}

首先定义了一个函数,该函数用来从RDD中取出前几条数据,并打印出结果与时间等。后面会调用foreachRDD函数。

private def foreachRDD(    foreachFunc: (RDD[T], Time) => Unit,    displayInnerRDDOps: Boolean): Unit = {  new ForEachDStream(this,    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()}

在foreachRDD中new出了一个ForEachDStream对象。并将这个注册给DStreamGraph 。

ForEachDStream对象也就是DStreamGraph中的outputStreams。

private[streaming] def register(): DStream[T] = {  ssc.graph.addOutputStream(this)  this}def addOutputStream(outputStream: DStream[_]) {  this.synchronized {    outputStream.setGraph(this)    outputStreams += outputStream  }}


当到达batchInterval的时间后,会调用DStreamGraph中的generateJobs方法

def generateJobs(time: Time): Seq[Job] = {  logDebug("Generating jobs for time " + time)  val jobs = this.synchronized {    outputStreams.flatMap { outputStream =>      val jobOption = outputStream.generateJob(time)      jobOption.foreach(_.setCallSite(outputStream.creationSite))      jobOption    }  }  logDebug("Generated " + jobs.length + " jobs for time " + time)  jobs}


然后就调用了outputStream也就是ForEachDStream的generateJob(time)方法:

override def generateJob(time: Time): Option[Job] = {  parent.getOrCompute(time) match {    case Some(rdd) =>      val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {        foreachFunc(rdd, time)      }      Some(new Job(time, jobFunc))    case None => None  }}

从这个方法开始,一直向DStream的依赖关系追溯上去。到最初的DStream,然后生成新的RDD,并将RDD写入generatedRDDs中。过程如下图:

第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考


备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


本文出自 “叮咚” 博客,请务必保留此出处http://lqding.blog.51cto.com/9123978/1773398