SparkStreaming updateStateByKey 保存记录信息

时间:2021-10-16 22:07:58
object SparkStreaming_StateFul {

def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf = new SparkConf().setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
.set("spark.executor.memory", "2g")
.set("spark.cores.max", "8")
.setJars(Array("E:\\ScalaSpace\\Spark_Streaming\\out\\artifacts\\Spark_Streaming.jar"))
val context = new SparkContext(conf)

val updateFunc = (values : Seq[Int],state : Option[Int]) => {
val currentCount = values.foldLeft(0)(_+_)
val previousCount = state.getOrElse(0) 查看是否存在,如果存在直接获取
Some(currentCount + previousCount)
}

//step1 create streaming context
val ssc = new StreamingContext(context,Seconds(10))
ssc.checkpoint(".")

//step2 create a networkInputStream on get ip:port and count the words in input stream of \n delimited text
val lines = ssc.socketTextStream("218.193.154.79",12345)

val data = lines.flatMap(_.split(" "))
val wordDstream = data.map(x => (x,1))

//使用updateStateByKey 来更新状态
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)

stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}

ssc.checkPoint 如果在集群上运行会报出如下的错误:
org.apache.spark.SparkException: Checkpoint RDD ReliableCheckpointRDD[9] at print at SparkStreaming_StateFul.scala:43(0) has different number of partitions from original RDD MapPartitionsRDD[8] at updateStateByKey at SparkStreaming_StateFul.scala:41(2)
at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:73)
at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:74)

这是因为没有将文件保存到hdfs环境中导致的