Apache Spark对象不是json解析器的可序列化异常

时间:2023-02-09 20:50:07

I am reading the data[json as String] from kafka queue and tring to parse json as String into case class using liftweb json api.

我正在读取kafka队列中的数据[json作为字符串],并尝试使用liftweb json api将json作为字符串解析为case类。

here is the code Snippet

下面是代码片段

val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParam: Map[String, String] = Map(
      "bootstrap.servers" -> kafkaServer,
      "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
      "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
      "zookeeper.connect" -> zookeeperUrl,
      "group.id" -> "demo-group")

    import org.apache.spark.streaming.kafka._
    import net.liftweb.json.{DefaultFormats, Formats}
    import net.liftweb.json._
    val topicSet = Map(kafkaTopic -> 1)
    val streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](sparkStreamingContext, kafkaParam, topicSet, StorageLevel.MEMORY_AND_DISK)

    streaming.map { case (id, tweet) => implicit val formats: Formats = DefaultFormats
      (id, parse(tweet).extract[Tweet])
    }.print()

    sparkStreamingContext.start()
    sparkStreamingContext.awaitTermination()

and i am getting this exception

我得到了这个例外

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: net.liftweb.json.DefaultFormats$
Serialization stack:
    - object not serializable (class: net.liftweb.json.DefaultFormats$, value: net.liftweb.json.DefaultFormats$@74a2fec)
    - field (class: Tweet, name: formats, type: interface net.liftweb.json.Formats)
    - object (class Tweet, Tweet(Akash24,Adele))
    - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
    - object (class scala.Tuple2, (1,Tweet(Akash24,Adele)))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 11)

Can anyone help me fix this problem Any Help will be appreciate Thanks

有人能帮我解决这个问题吗

1 个解决方案

#1


1  

From the logs it looks like a simple exception of Class not Serializable. to correct is use following code:

从日志中,它看起来像一个简单的类,而不是序列化的。正确使用以下代码:

sparkConf.registerKryoClasses(Array(classOf[DefaultFormats]))

val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(5))

val kafkaParam: Map[String, String] = Map(
  "bootstrap.servers" -> kafkaServer,
  "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
  "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
  "zookeeper.connect" -> zookeeperUrl,
  "group.id" -> "demo-group")

import org.apache.spark.streaming.kafka._
import net.liftweb.json.{DefaultFormats, Formats}
import net.liftweb.json._
val topicSet = Map(kafkaTopic -> 1)
val streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](sparkStreamingContext, kafkaParam, topicSet, StorageLevel.MEMORY_AND_DISK)

streaming.map { case (id, tweet) => implicit val formats: Formats = DefaultFormats
  (id, parse(tweet).extract[Tweet])
}.print()

sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()

It will make the DefaultFormats class serializable and Spark master will able to send implicit val formats to all worker nodes.

它将使DefaultFormats类可序列化,Spark master将能够将隐式的val格式发送到所有工作节点。

#1


1  

From the logs it looks like a simple exception of Class not Serializable. to correct is use following code:

从日志中,它看起来像一个简单的类,而不是序列化的。正确使用以下代码:

sparkConf.registerKryoClasses(Array(classOf[DefaultFormats]))

val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(5))

val kafkaParam: Map[String, String] = Map(
  "bootstrap.servers" -> kafkaServer,
  "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
  "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
  "zookeeper.connect" -> zookeeperUrl,
  "group.id" -> "demo-group")

import org.apache.spark.streaming.kafka._
import net.liftweb.json.{DefaultFormats, Formats}
import net.liftweb.json._
val topicSet = Map(kafkaTopic -> 1)
val streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](sparkStreamingContext, kafkaParam, topicSet, StorageLevel.MEMORY_AND_DISK)

streaming.map { case (id, tweet) => implicit val formats: Formats = DefaultFormats
  (id, parse(tweet).extract[Tweet])
}.print()

sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()

It will make the DefaultFormats class serializable and Spark master will able to send implicit val formats to all worker nodes.

它将使DefaultFormats类可序列化,Spark master将能够将隐式的val格式发送到所有工作节点。