DStream是RDD的模板,每隔一个batchInterval会根据DStream模板生成一个对应的RDD。然后将RDD存储到DStream中的generatedRDDs数据结构中:
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
下面我们以一个实际的例子来说明DStream中RDD的生成的全周期。
val lines = ssc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print()
代码可以‘翻译’成如下代码:
val lines = new SocketInputDStream("localhost", 9999) // 类型是 SocketInputDStreamval words = new FlatMappedDStream(lines, _.split(" ")) // 类型是 FlatMappedDStreamval pairs = new MappedDStream(words, word => (word, 1)) // 类型是 MappedDStreamval wordCounts = new ShuffledDStream(pairs, _ + _) // 类型是 ShuffledDStreamnew ForeachDStream(wordCounts, cnt => cnt.print()) // 类型是 ForeachDStream
我们先看看DStream的print方法:
def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println("Time: " + time) println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)}
首先定义了一个函数,该函数用来从RDD中取出前几条数据,并打印出结果与时间等。后面会调用foreachRDD函数。
private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()}
在foreachRDD中new出了一个ForEachDStream对象。并将这个注册给DStreamGraph 。
ForEachDStream对象也就是DStreamGraph中的outputStreams。
private[streaming] def register(): DStream[T] = { ssc.graph.addOutputStream(this) this}def addOutputStream(outputStream: DStream[_]) { this.synchronized { outputStream.setGraph(this) outputStreams += outputStream }}
当到达batchInterval的时间后,会调用DStreamGraph中的generateJobs方法
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs}
然后就调用了outputStream也就是ForEachDStream的generateJob(time)方法:
override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None }}
从这个方法开始,一直向DStream的依赖关系追溯上去。到最初的DStream,然后生成新的RDD,并将RDD写入generatedRDDs中。过程如下图:
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
本文出自 “叮咚” 博客,请务必保留此出处http://lqding.blog.51cto.com/9123978/1773398