spark streaming源码分析1 StreamingContext

时间:2023-02-15 20:47:11


博客地址: http://blog.csdn.net/yueqian_zhu/


首先看一个最简单的例子,了解大致的样子:

object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
本小节主要介绍StreamingContext的构造

class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
)

一、API:

1、cp_为null

def this(sparkContext: SparkContext, batchDuration: Duration)
2、方法内部也是通过conf自动创建一个sparkContext,cp_为null
def this(conf: SparkConf, batchDuration: Duration)
3、conf由默认的和参数部分组合而成,cp_为null
def this(    master: String,    appName: String,    batchDuration: Duration,    sparkHome: String = null,    jars: Seq[String] = Nil,    environment: Map[String, String] = Map())
4、从path目录下读取checkpoint的信息来重建streamingContext,也就不需要sparkContext和Duration参数
def this(path: String, hadoopConf: Configuration)
def this(path: String)//hadoopConf使用默认的hadoop配置文件自动构造
5、使用存在的sparkContext和checkpoint路径来构造
def this(path: String, sparkContext: SparkContext)
6、需要注意的是,streamingContext对象内部有一个getOrCreate方法,指明如果在checkpointPath路径下读取不到,则调用creatingFunc创建新的streamingContext
def getOrCreate(    checkpointPath: String,    creatingFunc: () => StreamingContext,    hadoopConf: Configuration = new Configuration(),    createOnError: Boolean = false  ): StreamingContext
二、StreamingContext主要的构造逻辑(checkpoint暂不讨论)
1、构造一个graph: DStreamGraph
作用于DStream上的operation分成两类 1. Transformation,2. Output 表示将输出结果。DStreamGraph 有输入就要有输出,如果没有输出,则前面所做的所有动作全部没有意义,那么如何将这些输入和输出绑定起来呢?这个问题的解决就依赖于DStreamGraph,DStreamGraph记录输入的Stream和输出的Stream。
2、构造一个JobScheduler
JobScheduler内部会构造一个jobGenerator,它用于按我们设定的批处理间隔产生job
3、状态设置为INITIALIZED
下一节介绍上面例子中的operation部分