使用mapPartitionsWithIndex进行DStream - Spark Streaming

时间:2023-01-16 20:49:15

I want to do something very simple: to check what is the content of each partition in the first RDD of my DStream. This is what I'm doing now:

我想做一些非常简单的事情:检查DStream的第一个RDD中每个分区的内容是什么。这就是我现在正在做的事情:

SparkConf sparkConfiguration= new SparkConf().setAppName("DataAnalysis").setMaster("local[*]");
    JavaStreamingContext sparkStrContext=new JavaStreamingContext(sparkConfiguration, Durations.seconds(1));
    JavaReceiverInputDStream<String> receiveParkingData=sparkStrContext.socketTextStream("localhost",5554);


Time time=new Time(1000);

JavaRDD<String>dataRDD= receiveParkingData.compute(time);

//I get an error in this RDD

    JavaRDD<String>indexDataRDD=dataRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
        @Override

        public Iterator<String> call(Integer integer, Iterator<String> stringIterator) throws Exception {
            return null;
        }
    });

indexDataRDD.collect();

So I want to print the content of each partition and its ID. However, on the indexDataRDD I get this message in my IntelliJ IDE: mapPartitionsWithIndex (Function2<Integer, Iterator<String>, Iterator<String>>, boolean) in AbstractJavaRDDLike cannot be applied to (Function2<Integer, Iterator<String>, Iterator<String>>)

所以我想打印每个分区的内容及其ID。但是,在indexDataRDD上我在IntelliJ IDE中收到此消息:AbstractJavaRDDLike中的mapPartitionsWithIndex(Function2 ,Iterator >,boolean)无法应用于(Function2 ,Iterator <字符串> >) ,iterator> ,iterator>

Can someone help me with this issue? Is there another, easier way to get the content in each partition? I really want to know the specific content of each partition. Thank you so much.

有人可以帮我解决这个问题吗?是否有另一种更简单的方法来获取每个分区中的内容?我真的想知道每个分区的具体内容。非常感谢。

1 个解决方案

#1


0  

Here is sample program for mapPartitionsWithIndex for your reference.

以下是mapPartitionsWithIndex的示例程序供您参考。

public class SparkDemo {
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("SparkDemo").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    List<String> data = Arrays.asList("one","two","three","four","five");
    JavaRDD<String> javaRDD = sc.parallelize(data, 2);
    JavaRDD<String> mapPartitionsWithIndexRDD = javaRDD
            .mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
                @Override
                public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
                    LinkedList<String> linkedList = new LinkedList<String>();
                    while (iterator.hasNext()){
                            linkedList.add(Integer.toString(index) + "-" + iterator.next());
                        }
                    return linkedList.iterator();
                }
            }, false);
    System.out.println("mapPartitionsWithIndexRDD " + mapPartitionsWithIndexRDD.collect()); 
    sc.stop();
    sc.close();
   }
}

#1


0  

Here is sample program for mapPartitionsWithIndex for your reference.

以下是mapPartitionsWithIndex的示例程序供您参考。

public class SparkDemo {
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("SparkDemo").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    List<String> data = Arrays.asList("one","two","three","four","five");
    JavaRDD<String> javaRDD = sc.parallelize(data, 2);
    JavaRDD<String> mapPartitionsWithIndexRDD = javaRDD
            .mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
                @Override
                public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
                    LinkedList<String> linkedList = new LinkedList<String>();
                    while (iterator.hasNext()){
                            linkedList.add(Integer.toString(index) + "-" + iterator.next());
                        }
                    return linkedList.iterator();
                }
            }, false);
    System.out.println("mapPartitionsWithIndexRDD " + mapPartitionsWithIndexRDD.collect()); 
    sc.stop();
    sc.close();
   }
}