IDEA上Spark——Java、Scala的本地测试版与集群运行版

时间:2022-06-25 12:55:54

Java本地测试

2017/11/13

1.先放出代码中各类注释

(由于初次学习,为方便以后复习或查阅,注释较多。为避免代码太“冗余”故将部分注释分出来。)
注释i )、

  • 每个Spark应用程序都由一个驱动器程序(驱动器节点产生的,驱动器节点通俗的讲,可以视为hadoop中的namenode)来发起集群上的各种并行操作,驱动器程序(shell中的驱动器程序是Spark shell)可执行应用程序中的main方法,驱动器通过一个SparkContext对象访问Spark——这个对象代表对集群的一个连接(shell启动时已经自动创建了一个SparkContext对象)。【参考:参考资源1中P12】
  • 在Spark中,SparkContext是Spark所有功能的一个入口,无论是用java、scala、还是python编写都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件、调度器(DAGScheduler、TaskScheduler),到Spark Master节点上进行注册,等等。
    但是在Spark中,编写不同类型的Spark应用程序,使用的SparkContext是不同的,如果使用scala,使用的就是原生的SparkContext对象
    但是如果使用Java,那么就是JavaSparkContext对象
    如果是开发Spark SQL程序,那么就是SQLContext、HiveContext
    如果是开发Spark Streaming程序,那么就是它独有的SparkContext
    以此类推【参考:北风网相关课程】

注释ii)、

RDD支持两种类型的操作,一种是转化(transformation)操作,一种是行动(action)操作。行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或者把结果存储到如HDFS这样的外部存储系统。但是要注意的是,RDD的行动操作是“懒惰”型的,它只会在你第一次真正使用它是进行相应计算操作(对于大数据环境,这是个很有必要的特点,也正因如此,我们不应该把RDD看做存放着特定数据的数据集,而应该看做是一个存放着计算步骤的指令列表),并且在默认情况下,即使是同一个行动操作,每一次调用,它都会重新计算(如果需要频繁用到某个操作产生的结果,可以将结果缓存,使用方法RDD.persist()——但是我认为对于数据量大的结果,这个方法还是不管用吧?)。【参考:参考资源1中P22、P27】
如果转换操作比较简单,则创建指定Function的匿名内部类;但是如果function比较复杂,则会单独创建一个类,作为实现这个function接口的类

注释iii )、

写一堆匿名内部类调用方法,感觉代码太“繁杂”,于是换用函数式编程的lambda替代,lambda实现的上方所注释的,就是对应的“繁杂”版。
TODO:lambda在“背后”具体是怎么转换代码的

2.代码以及源码地址:

https://github.com/AtTops/Practice-Item/tree/master/SparkScala/main/scala/per/wanghai

创建maven项目,最低依赖仅需Spark的core包
部分代码有普通java与lambda两种写法

package per.wanghai.spark.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
* @author 王海[https://github.com/AtTops]
* @package per.wanghai.spark.core
* @description Java 本地版Spark的单词计数程序
* @Date 2017/11/13 19:55
* @Version V1.0
*/

public class WordCountLocal {

private static final String OUTPUTFILEPATH = "./dataout";

public static void main(String[] args) {
/* 第一步:创建SparkConf对象,设置Spark应用的配置信息
使用setMaster()可以设置Spark应用程序要连接的Spark集群的master节点的url
设置为local则代表在本地运行*/

SparkConf conf = new SparkConf().setAppName("WordCountLocal").setMaster("local");
// 第二步: 创建SparkContext对象【详细注解见博客:“注释i”】
JavaSparkContext sc = new JavaSparkContext(conf);
/* 第三步:要针对输入源(hdfs文件、本地文件,等等),创建一个初始的RDD,
[我们有两种方法创建RDD:1. 读取一个外部数据集(比如这里的本地文件);2. 在驱动程序里
分发驱动器程序中的集合(如list、set等)]。
输入源中的数据会打乱,分配到RDD的每个partition中,从而形成一个初始的分布式的数据集
在Java中,创建的普通RDD,都叫做JavaRDD
RDD中,有元素这种概念,如果是hdfs或者本地文件创建的RDD,每一个元素就相当于
是文件里的一行*/

// textFile()用于根据文件类型的输入源创建RDD
JavaRDD<String> lines = sc.textFile("./data/wordcount.txt");
/* 第四步:对初始RDD进行transformation操作,也就是一些计算操作
注意,RDD支持两种类型的操作,一种是转化(transformation)操作,
一种是行动(action)操作。【详细注解见博客:“注释ii”】
*/

// flatMap将RDD的一个元素给拆分成多个元素;FlatMapFunction的两个参数分别是输入和输出类型
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});

// 第五步:将每一个单词,映射为(单词, 1)的这种格式。为之后进行reduce操作做准备。
// mapToPair,就是将每个元素映射为一个(v1,v2)这样的Tuple2(scala里的Tuple类型)类型的元素
// mapToPair得与PairFunction配合使用,PairFunction中的第一个泛型参数代表输入类型
// 第二个和第三个泛型参数,代表的输出的Tuple2的第一个值和第二个值的类型
// JavaPairRDD的两个泛型参数,分别代表了tuple元素的第一个值和第二个值的类型

// JavaPairRDD<String, Integer> pairs = words.mapToPair(
// new PairFunction<String, String, Integer>() {
// private static final long serialVersionUID = 1L;
// @Override
// public Tuple2<String, Integer> call(String word) throws Exception {
// return new Tuple2<>(word, 1);
// }});

// 注:mapToPair的lambda版本:【详见注释"iii"】
JavaPairRDD<String, Integer> pairs = words.mapToPair(
(PairFunction<String, String, Integer>) word -> new Tuple2<>(word, 1)
);

// 第六步:reduce操作(原理同MapReduce的reduce操作一样)
// 以单词作为key,统计每个单词出现的次数
// 最后返回的JavaPairRDD中的元素,也是tuple,但是第一个值就是每个key,第二个值就是key出现的次数
// JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
// new Function2<Integer, Integer, Integer>() {
// private static final long serialVersionUID = 1L;
// @Override
// public Integer call(Integer v1, Integer v2) throws Exception {
// return v1 + v2;
// }});

JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
(Function2<Integer, Integer, Integer>) (x, y) -> (x + y)
);

// 第七步:触发程序执行
// 到这里为止,我们通过几个Spark算子操作,已经统计出了单词的次数
// 目前为止我们使用的flatMap、mapToPair、reduceByKey操作,都是transformation操作
// 现在我们需要一个action操作来触发程序执行(这里是foreach)
wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1 + " 出现了 " + wordCount._2 + " 次; ");
}
});

// 我们也可以通过将统计出来的结果存入文件(这也是一个action操作),来触发之前的transformation操作
try {
wordCounts.saveAsTextFile(OUTPUTFILEPATH);
} catch (Exception e) {
e.printStackTrace();
}

sc.close();
}
}

3.遇见的问题

在单词分割时使用FlatMapFunction报错:call(String)’ in ‘Anonymous class derived from org.apache.spark.api.java.function.FlatMapFunction’ *es with ‘call(T)’ in ‘org.apache.spark.api.java.function.FlatMapFunction’; attempting to use incompatible return type
【返回类型冲突】,解决办法:将Iterable《String>修改为terator《String>,并在line.split(” “)后再调用iterator()方法;

打包jar到集群测试

  • 代码见【】项目中的WordCountCluster.java
  • 打包并上传集群。IDEA简便打包可参考简便打包
  • 编写spark-submit脚本
    /usr/hdp/2.6.1.0-129/spark2/bin/spark-submit \
    –master yarn \
    –class per.wanghai.WordCountCluster \
    –executor-cores 6 \
    /usr/wh/spark_study/java/sparkJava-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 添加执行权限然后执行chmod +x wordcount.sh
    解释:
    -这里我是将应用提交到YARN集群上运行(详细的参数通过spark-submit - -help查看)
    -因为程序简单,- -driver-memory、- -executor-memory、- -num-executors参数就都用的默认值

运行截图:{
// 运行并没有任何报错,就是不打印消息,不是文件路径的问题,试过普通println,也没有消息。
// 但是(见下文。Scala出现同样的问题,但是RDD的结果是保存了的,说明代码没问题的)
}

Scala本地测试

【注意:用Scala开发,一定要注意Sbt、Scala、Spark的版本兼容问题,真的很烦的。下文会陆续解释。】

  • 在写这篇文章之前,我就配置好了IDEA下用SBT开发Spark程序所需要的环境。社区中相关博文资料也不少,这里就不再赘述。如有需要,大家可以参考这篇文章这一篇文章
  • 需要注意的是:
    在配置SBT的VM parameters时,将sbtconfig.txt里的注释内容去掉;
    并且把下方的sbt-launch.jar设置为本地的\sbt\bin\sbt-launch.jar;
    setting和default setting里最好都修改。
  • 成功后展开大部分目录如图:
    IDEA上Spark——Java、Scala的本地测试版与集群运行版
  • 添加spark相关依赖——用 ++= 方法一次性添加一个依赖列表(也可以libraryDependencies += groupID % artifactID % revision这样一条一条的添加):
    libraryDependencies += “org.apache.spark” % “spark-core_2.11” % “2.1.1”

  • 导入的spark-2.1.1-bin-hadoop2.7下的jars

  • 【注意,spark2.0开始,不再构建spark-assembly.jar,而是拆分为多个jars构建于spark-2.X-bin-hadoop2.7/jars】。

源码

package per.wanghai

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
* Created by WangHai on 2017/11
*/

object WordcountLocal {
def main(args: Array[String]) = {
val sparkConf = new SparkConf().setAppName("WordCountScala")
.setMaster("local")
val sc = new SparkContext(sparkConf)
// D:/sbt/myproject/SparkScala/2.txthdfs://host0.com:8020/user/attop/test_data/SPARK_README.md
val lines = sc.textFile("D:/sbt/myproject/SparkScala/2.txt", 1)
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
val wordcounts = pairs.reduceByKey {
_ + _
}
wordcounts.foreach(wordcount => println(wordcount._1 + "出现了" + wordcount._2 + "次。"))
}
}

本地运行结果截图:
IDEA上Spark——Java、Scala的本地测试版与集群运行版

遇见的问题

  • ——获取不到sbt0.13.16:
    解决办法:导致错误的原因是最初我把配置文件下的local源除了阿里云的都注释掉了,取消注释即可。
  • ——NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    解决办法:Spark2.0之后是用的Scala2.11编译的,所以这里应该把IDEA中的Scala SDK修改为2.11.X:官网下载2.11.11,重新加载;然后把build中的ScalaVersion改为2.11.11
  • ——lang.IllegalArgumentException: Can not create a Path from an empty string
    解决办法:是我自己代码敲错了

Scala打包到集群测试

源码以及地址

https://github.com/AtTops/Practice-Item/tree/master/SparkScala/main/scala/per/wanghai

package per.wanghai

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by WangHai on 2017/11/22 21:43
*/

object WordcountCluster {
def main(args: Array[String]) = {
val sparkConf = new SparkConf().setAppName("WordCountScala")
val sc = new SparkContext(sparkConf)
val lines = sc.textFile(args(0))
val wordcounts = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
wordcounts.foreach(wordcount => println(wordcount._1 + "出现了" + wordcount._2 + "次。"))
val wordsort = wordcounts.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
wordsort.saveAsTextFile(args(1))
sc.stop()
}
}
  • IDEA中命令行直接执行sbt package命令

遇见的问题

  • —— object apache is not a member of package org
    *解决过程:
    1.我把本地的Spark2.2.0降为了2.1.1。相关依赖也替换掉【注意,spark2.0开始,不再构建spark-assembly.jar,而是拆分为多个jars构建于spark-2.X-bin-hadoop2.7/jars】。
    下载页面:spark-2.1.1-bin-hadoop2.7.tgz
  • 依然报错,
    2.于是我考虑将spark-core包用sbt加入依赖。然后刷新项目
  • ——在build.sbt中又得到这样的警告:Unknown artifact. Not resolved or indexed
    *解决办法:
    3.Press Alt+Enter in IntelliJ on the lines with the warning and select the “update project resolvers’ indexes” quickfix, then select the “local cache” index and click “update”.
    You can verify that the update worked by checking the date in the “Updated” column:参考自Stack Overflow发现警告仍然还在,奈何我有“洁癖”,
    4.重启了IDEA,问题还是存在。
    5.重新编译打包,发现打包成功。
    不得不说,很烦的好不!而且IDEA每次打开,都检索项目,很耗时的对不!用Eclipse不多久就转了IDEA,不知道Eclipse有没有这个问题;还有,Eclipse单独对一个文件打包时,直接Export很方便的,IDEA中,我使用Empty打包或者with 依赖中自定义打包内容,暂时都还没有满意的结果。
  • 这时候查看项目结构,我发觉之前手动添加的spark-2…..下的jars不见了。
    spark-submit 脚本

–executor-memory 3G \
/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit \
–class per.wanghai.WordcountCluster \
–master yarn \
–driver-memory 2G \
–executor-memory 3G \
–executor-cores 8 \
/usr/wh/spark_study/scala/sparkscala_2.11-1.0.jar \
file:///usr/wh/spark_study/scala/a.txt \
file:///usr/wh/spark_study/scala/out

运行截图:仍然不打印信息
但是spark-shell执行同样代码是可以显示的:
IDEA上Spark——Java、Scala的本地测试版与集群运行版
但是:HDFS上是保存了的
IDEA上Spark——Java、Scala的本地测试版与集群运行版

参考资源

接着cat一把试试,哈哈:
IDEA上Spark——Java、Scala的本地测试版与集群运行版
1. 《Spark快速大数据分析》
2. 《Spark和核心技术与高级应用》
3. 北风网相关视频