1.说明
虽然DStream可以转换成RDD,但是如果比较复杂,可以考虑使用SparkSQL。
2.集成方式
Streaming和Core整合:
transform或者foreachRDD方法
Core和SQL整合:
RDD <==> DataFrame 互换
3.程序
package com.sql.it
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object StreamingSQL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StreamingWindowOfKafka22")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(5))
// 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
// 路径对应的文件夹不能存在
ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/9421151351") val kafkaParams = Map(
"group.id" -> "streaming-kafka-78912151",
"zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
"auto.offset.reset" -> "smallest"
)
val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
ssc, // 给定SparkStreaming上下文
kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
topics, // 给定读取对应topic的名称以及读取数据的线程数量
StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
).map(_._2) /**
* transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
*/
dstream.transform(rdd => {
// 使用sql统计wordcoount
val sqlContext = SQLContextSingelton.getSQLContext(rdd.sparkContext)
import sqlContext.implicits._
val procedRDD = rdd.filter(_.nonEmpty).flatMap(_.split(" ").map((_, 1)))
procedRDD.toDF("word", "c").registerTempTable("tb_word")
val resultRDD = sqlContext.sql("select word, count(c) as vc from tb_word group by word").map(row => {
val word = row.getAs[String]("word")
val count = row.getAs[Long]("vc")
(word, count)
}) resultRDD
}).print() // 启动开始处理
ssc.start()
ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
}
} object SQLContextSingelton {
@transient private var instance: SQLContext = _ def getSQLContext(sc: SparkContext): SQLContext = {
if (instance == null) {
synchronized[SQLContext] {
if (instance == null) {
instance = new SQLContext(sc)
}
instance
}
}
instance
}
}
4.效果