HBase Error: connection object not serializable

时间:2023-03-09 07:27:30
HBase Error: connection object not serializable

HBase Error: connection object not serializable

想在spark driver程序中连接HBase数据库,并将数据插入到HBase,但是在spark集群提交运行过程中遇到错误:connection object not serializable

详细的错误:

Exception in thread "main" java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
com.sae.model.HbaseHelper
Serialization stack:
- object not serializable (class: com.sae.model.HbaseHelper, value: com.sae.model.HbaseHelper@27a09971)
- field (class: com.sae.demo.KafkaStreamingTest$$anonfun$main$, name: hbHelper$, type: class com.sae.model.HbaseHelper)
- object (class com.sae.demo.KafkaStreamingTest$$anonfun$main$, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$$anonfun$apply$mcV$sp$, name: cleanedF$, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$$anonfun$apply$mcV$sp$, <function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@4bdf)
- element of array (index: )
- array (class [Ljava.lang.Object;, size )
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@4bdf, org.apache.spark.streaming.dstream.ForEachDStream@2b4d4327))
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
checkpoint files ])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.kafka.KafkaInputDStream, org.apache.spark.streaming.kafka.KafkaInputDStream@163042ea)
- element of array (index: )
- array (class [Ljava.lang.Object;, size )
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.kafka.KafkaInputDStream@163042ea))
- writeObject data (class: org.apache.spark.streaming.DStreamGraph)
- object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@2577a95d)
- field (class: org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph)
- object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@2b4b96a4)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:)
at org.apache.spark.streaming.StreamingContext.liftedTree1$(StreamingContext.scala:)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:)
at com.sae.demo.KafkaStreamingTest$.main(StreamingDataFromKafka.scala:)
at com.sae.demo.KafkaStreamingTest.main(StreamingDataFromKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:)
at java.lang.reflect.Method.invoke(Method.java:)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$(SparkSubmit.scala:)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

解决办法:

参考官方文档:传送门

应该把打开数据库连接的代码放到foreachPartition内部,如:

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}