如何将JavaPairDStream写入Redis?

时间:2023-02-09 20:49:43

I am using spark 1.5.0 and java 7.

我使用的是spark 1.5.0和java 7。

Input is from kafka in form of different json objects with a type field. Eg:

输入来自kafka,其形式为不同的json对象和类型字段。例如:

{'type': 'alpha', ...}
{'type': 'beta', ...}
...

I am creating a JavaPairDStream<String, Integer> from this input data corresponding to counts of each event type.

我正在创建一个JavaPairDStream ,从这个输入数据对应每个事件类型的计数。 ,>

I want to store this data to redis. How can I go about doing this?

我想把这些数据存储到redis。我该怎么做呢?

2 个解决方案

#1


2  

Used the foreachRDD and forEach functions to achieve this as follows:

使用foreachRDD和forEach函数实现如下:

wordCounts.foreachRDD(
    new Function<JavaPairRDD<String, Integer>, Void>() {
        public Void call(JavaPairRDD<String, Integer> rdd) {
            rdd.foreach(
                new VoidFunction<Tuple2<String,Integer>>() {
                    public void call(Tuple2<String,Integer> wordCount) {
                        System.out.println(wordCount._1() + ":" + wordCount._2());
                        JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");
                        Jedis jedis = pool.getResource();
                        jedis.select(0);
                        jedis.set(wordCount._1(), wordCount._2().toString());
                    }
                }
            );
            return null;
        }
    }
);

#2


0  

Creating a new connection pool for every single RDD is very inefficient. I suggest creating one connection for each partition:

为每一个RDD创建一个新的连接池是非常低效的。我建议为每个分区创建一个连接:

wordCount.mapPartitions(p->{
 Jedis jd = new Jedis(getJedisConfig());
 while (p->hasNext()) {
   Tuple2<String,Integer> data = p.next();
   String word = data._1();
   Integer cnt = data._2();
   jd.set(word,count); // or any other format of save to Redis
 }
}
)

#1


2  

Used the foreachRDD and forEach functions to achieve this as follows:

使用foreachRDD和forEach函数实现如下:

wordCounts.foreachRDD(
    new Function<JavaPairRDD<String, Integer>, Void>() {
        public Void call(JavaPairRDD<String, Integer> rdd) {
            rdd.foreach(
                new VoidFunction<Tuple2<String,Integer>>() {
                    public void call(Tuple2<String,Integer> wordCount) {
                        System.out.println(wordCount._1() + ":" + wordCount._2());
                        JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");
                        Jedis jedis = pool.getResource();
                        jedis.select(0);
                        jedis.set(wordCount._1(), wordCount._2().toString());
                    }
                }
            );
            return null;
        }
    }
);

#2


0  

Creating a new connection pool for every single RDD is very inefficient. I suggest creating one connection for each partition:

为每一个RDD创建一个新的连接池是非常低效的。我建议为每个分区创建一个连接:

wordCount.mapPartitions(p->{
 Jedis jd = new Jedis(getJedisConfig());
 while (p->hasNext()) {
   Tuple2<String,Integer> data = p.next();
   String word = data._1();
   Integer cnt = data._2();
   jd.set(word,count); // or any other format of save to Redis
 }
}
)