Spark streaming不同数据来源(socket套接字、hdfs目录)和存储位置(hdfs、本地)的java代码

时间:2021-06-10 12:20:55

下面是大数据经典案例单词统计的Java代码:

import java.util.Arrays;

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

public class SparkStream {
public static void main(String[] args) {

/*创建一个本地StreamingContext两个工作线程和批间隔1秒(原作者是间隔一秒,产生的速度太快了,我这里改为30秒,
还有就是产生了在将处理结果存放的目录下产生了大量的小文件,这样在生产环境中肯定是不行的,我感觉应该是按文件的大小来产生而不应该是按时间间隔产生)*/
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount").set("spark.testing.memory",
"2147480000");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));
System.out.println(jssc);

//下面两行代码是数据来源:第一行是通过socketTextStream套接字,第二行是直接通过hdfs上的某个文件目录来作为输入数据源
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("h40", 9999);
// JavaDStream<String> lines = jssc.textFileStream("hdfs://h40:9000/stream");

// Split each line into words
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
System.out.println(Arrays.asList(x.split(" ")).get(0));
return Arrays.asList(x.split(" "));
}
});


// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
System.out.println(pairs);
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

// 打印前十的元素每个抽样生成DStream到控制台
wordCounts.print();

wordCounts.dstream().saveAsTextFiles("hdfs://h40:9000/spark/", "haha");
//wordCounts.dstream().saveAsTextFiles("file:///home/hadoop/hui/", "haha");//上一行代码是将处理结果保存在hdfs中,这行代码是将处理结果保存在Linux本地中
//wordCounts.saveAsHadoopFiles("hdfs://h40:9000/testFile/", "spark");

jssc.start();
System.out.println(wordCounts.count()); //在spark-1.6.0-bin-hadoop2.6中删除掉
jssc.awaitTermination(); // Wait for the computation to terminate
}

}


打开一个终端,输入 命令 nc -lk 9999,暂时叫做 “nc终端” 吧
[hadoop@h40 ~]$ nc -lk 9999


用eclipse打包成wordcount.jar并上传到/home/hadoop/spark-1.3.1-bin-hadoop2.6目录下,执行命令
[hadoop@h40 spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h40:7077 --name JavaWordCountByHQ --class SparkStream --executor-memory 500m --total-executor-cores 2 wordcount.jar

在nc终端输入数据:
hello world
hello hadoop
hello hive
hello hbase
hello flume
hello kafka
hello storm
hello zookeeper
hello phonix
hello spark
hello sqoop

然后命令端每30秒会有类似以下日志循环输出

。。。(省略打印的日志信息)
-------------------------------------------
Time: 1355317250000 ms
-------------------------------------------
(hive,1)
(zookeeper,1)
(spark,1)
(hadoop,1)
(storm,1)
(hbase,1)
(hello,11)
(phonix,1)
(kafka,1)
(sqoop,1)
...
。。。(省略打印的日志信息)
-------------------------------------------
Time: 1355317280000 ms
-------------------------------------------
。。。(省略打印的日志信息)


#可以为了控制台显示干净,只输入自己想要的信息,可将控制台上输出的log级别调整为ERROR.

[hadoop@h40 conf]$ pwd
/home/hadoop/spark-1.3.1-bin-hadoop2.6/conf
[hadoop@h40 conf]$ cp log4j.properties.template log4j.properties
[hadoop@h40 conf]$ vi log4j.properties
将log4j.rootCategory=INFO, console修改为:
log4j.rootCategory=ERROR, console

然后在上面的Java代码中加上这么一行PropertyConfigurator.configure("/home/hadoop/spark-1.3.1-bin-hadoop2.6/conf/log4j.properties");


查看hdfs路径:
[hadoop@h40 ~]$ hadoop fs -lsr /spark
drwxr-xr-x   - hadoop supergroup          0 2012-12-12 21:00 /spark/-1355317250000.haha
-rw-r--r--   3 hadoop supergroup          0 2012-12-12 21:00 /spark/-1355317250000.haha/_SUCCESS
-rw-r--r--   3 hadoop supergroup         28 2012-12-12 21:00 /spark/-1355317250000.haha/part-00000
-rw-r--r--   3 hadoop supergroup         41 2012-12-12 21:00 /spark/-1355317250000.haha/part-00001
-rw-r--r--   3 hadoop supergroup         52 2012-12-12 21:00 /spark/-1355317250000.haha/part-00002
-rw-r--r--   3 hadoop supergroup         10 2012-12-12 21:00 /spark/-1355317250000.haha/part-00003
[hadoop@h40 ~]$ hadoop fs -cat /spark/-1355317250000.haha/part-00000
(hive,1)
(zookeeper,1)
[hadoop@h40 ~]$ hadoop fs -cat /spark/-1355317250000.haha/part-00001
(spark,1)
(hadoop,1)
(storm,1)
(hbase,1)
[hadoop@h40 ~]$ hadoop fs -cat /spark/-1355317250000.haha/part-00002
(hello,11)
(phonix,1)
(kafka,1)
(sqoop,1)
(world,1)
[hadoop@h40 ~]$ hadoop fs -cat /spark/-1355317250000.haha/part-00003
(flume,1)


注意:在SparkStream.java代码中
1.wordCounts.saveAsHadoopFiles("hdfs://h40:9000/testFile/", "spark");这个方法就不要用了
会报这个错(我也没有解决掉):
java.lang.RuntimeException: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat 
这个wordCounts.dstream().saveAsTextFiles("hdfs://h40:9000/spark/", "haha");能成功其实就已经行了,又何必去纠结一个功能一样却无法解决报错的方法呢
(可是让强迫症的我知道了却无法解决的问题总是心了不得劲儿的活。。。。。


2.当你将程序打成jar包在Linux上运行的时候,要是想把数据存储在Linux本地的话,需要这样写wordCounts.dstream().saveAsTextFiles("file:///home/hadoop/hui/", "haha");file:///必须得加,否则无法在/home/hadoop/hui/目录下生成文件夹,如果你将程序在Windows的myeclipse中运行的话得这样写wordCounts.dstream().saveAsTextFiles("file:///C:\\Users\\huiqiang\\Desktop\\hui\\", "haha");其中file:///可以不加。

3.当在Linux中运行程序,使用的是wordCounts.dstream().saveAsTextFiles("file:////home/hadoop/hui/", "haha");的时候,会在/home/hadoop/hui/目录下生成如下形式的文件夹
[hadoop@h40 hui]$ ll
total 8
drwxrwxr-x 2 hadoop hadoop 4096 Jun 20 17:35 -1497951350000.haha
drwxrwxr-x 2 hadoop hadoop 4096 Jun 20 17:36 -1497951380000.haha
可是你却无法进入这些文件夹:
[hadoop@h40 hui]$ cd -1497951350000.haha
-bash: cd: -1: invalid option
cd: usage: cd [-L|-P] [dir]
经过一番虐心的试验,终于成功,应该这样进入:
[hadoop@h40 hui]$ cd -- -1497951350000.haha
[hadoop@h40 -1497951350000.haha]$ ll
total 12
-rw-r--r-- 1 hadoop hadoop 30 Jun 20 17:48 part-00000
-rw-r--r-- 1 hadoop hadoop 67 Jun 20 17:48 part-00001
-rw-r--r-- 1 hadoop hadoop 29 Jun 20 17:48 part-00002
-rw-r--r-- 1 hadoop hadoop  0 Jun 20 17:48 _SUCCESS
[hadoop@h40 -1497951350000.haha]$ cat part-00000
(flume,1)
(kafka,1)
(world,1)
[hadoop@h40 -1497951350000.haha]$ cat part-00001
(hadoop,1)
(zookeeper,1)
(hello,11)
(phonix,1)
(storm,1)
(sqoop,1)
[hadoop@h40 -1497951350000.haha]$ cat part-00002
(spark,1)
(hive,1)
(hbase,1)
删除这些文件夹:
[hadoop@h40 hui]$ rm -rf -- *

4.System.out.println(wordCounts.count());这行代码在spark-1.3.1-bin-hadoop2.6中识别,可是在spark-1.6.0-bin-hadoop2.6中却无法识别,报这样的错:
Exception in thread "main" java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported


留给自己的问题:按这个代码将处理结果按时间产生文件到指定目录下的话会产生太多的小文件了,在实际生产中不太合适吧,能不能按文件大小来生成,如当处理结果每达到100M的时候就产生一个新的文件,但不知道代码该如何实现,以后有时间可以研究研究


参考资料:

http://blog.csdn.net/tanggao1314/article/details/51606721