在apach spark流中使用foreachRDD内部的数据库连接

时间:2021-11-09 00:59:30

In spark streaming, I want to query the db before each batch is processed, store the results in a hashmap that can be serialized and sent over a network to the executors.

在spark streaming中,我想在处理每个批处理之前查询db,将结果存储在可以序列化并通过网络发送给执行程序的hashmap中。

class ExecutingClass implements Serializable {
 init(DB db) {

   try(JavaStreamingContext jsc = new JavaStreamingContext(...)) {

   JavaPairInputDStream<String,String> kafkaStream = getKafkaStream(jsc);

   kafkaStream.foreachRDD(rdd -> {
   // this part is supposed to execute in the driver
  Map<String, String> indexMap = db.getIndexMap();// connects to a db, queries the results as a map

  JavaRDD<String> results = processRDD(rdd, indexMap);

  ...  

 }


  }
    JavaRDD<String> processRDD(JavaPairRDD<String, String> rdd,       Map<String,String> indexMap) {
 ... 
    }
    }

In the above code, the indexMap is supposed to be initialized at the driver, the resulting map is used in processing the rdd. I have no problems when I declare indexMap outside of foreachRDD closure but I am getting serialization errors when I do it inside. what is the reason for this?

在上面的代码中,indexMap应该在驱动程序中初始化,结果映射用于处理rdd。当我在foreachRDD闭包之外声明indexMap但我在内部执行时遇到序列化错误时没有问题。这是什么原因?

The reason I want to do something like this is to ensure I have the latest values from the database for each batch. I suspect this is due to the closure of foreachRDD trying to serialize everything outside the closure.

我想做这样的事情的原因是为了确保每个批次都有来自数据库的最新值。我怀疑这是因为foreachRDD的关闭试图序列化关闭之外的所有东西。

1 个解决方案

#1


0  

You are using db object ( which is instance of DB), inside forEachRdd so spark tries to serialize db , for avoiding this we need to create DB connection inside forEachRdd (or) you can make use of object pools as discussed in the below article http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/

您正在使用db对象(这是DB的实例),在forEachRdd中,因此spark尝试序列化db,为避免这种情况,我们需要在forEachRdd(或)中创建数据库连接,您可以使用对象池,如下面的文章http所述://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/

#1


0  

You are using db object ( which is instance of DB), inside forEachRdd so spark tries to serialize db , for avoiding this we need to create DB connection inside forEachRdd (or) you can make use of object pools as discussed in the below article http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/

您正在使用db对象(这是DB的实例),在forEachRdd中,因此spark尝试序列化db,为避免这种情况,我们需要在forEachRdd(或)中创建数据库连接,您可以使用对象池,如下面的文章http所述://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/