Spark中的两种模式

时间:2023-03-09 08:58:28
Spark中的两种模式

两种模式

client-了解

Spark中的两种模式

cluster模式-开发使用

Spark中的两种模式

操作

1.需要Yarn集群

2.历史服务器

3.提交任务的的客户端工具-spark-submit命令

4.待提交的spark任务/程序的字节码--可以使用示例程序

spark-shell和spark-submit

  • 两个命令的区别

spark-shell:spark应用交互式窗口,启动后可以直接编写spark代码,即时运行,一般在学习测试时使用

spark-submit:用来将spark任务/程序的jar包提交到spark集群(一般都是提交到Yarn集群)

Spark程序开发

导入依赖

'''





org.apache.spark

spark-core_2.11

2.4.5

    <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency> <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.12</version>
</dependency> <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.12</version>
</dependency> <dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
</dependencies>

org.apache.maven.plugins
maven-compiler-plugin
3.1

1.8
1.8

        <!-- Scala Compiler -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin> </plugins>
</build>

'''

案例

'''

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Demo02WordCount {

def main(args: Array[String]): Unit = {

/**

* 1、去除setMaster("local")

* 2、修改文件的输入输出路径(因为提交到集群默认是从HDFS获取数据,需要改成HDFS中的路径)

* 3、在HDFS中创建目录

* hdfs dfs -mkdir -p /spark/data/words/

* 4、将数据上传至HDFS

* hdfs dfs -put words.txt /spark/data/words/

* 5、将程序打成jar包

* 6、将jar包上传至虚拟机,然后通过spark-submit提交任务

* spark-submit --class Demo02WordCount --master yarn-client spark-1.0.jar

* spark-submit --class cDemo02WordCount --master yarn-cluster spark-1.0.jar

/

val conf: SparkConf = new SparkConf

conf.setAppName("Demo02WordCount")

//conf.setMaster("local[
]")

val sc: SparkContext = new SparkContext(conf)

val fileRDD: RDD[String] = sc.textFile("/spark/data/words/words.txt")

// 2、将每一行的单词切分出来

// flatMap: 在Spark中称为 算子

// 算子一般情况下都会返回另外一个新的RDD

val flatRDD: RDD[String] = fileRDD.flatMap(_.split(","))

//按照单词分组

val groupRDD: RDD[(String, Iterable[String])] = flatRDD.groupBy(word => word)

val words: RDD[String] = groupRDD.map(kv => {

val key = kv._1

val size = kv._2.size

key + "," +size

})

// 使用HDFS的JAVA API判断输出路径是否已经存在,存在即删除

val hdfsConf: Configuration = new Configuration()

hdfsConf.set("fs.defaultFS", "hdfs://master:9000")

val fs: FileSystem = FileSystem.get(hdfsConf)

// 判断输出路径是否存在

if (fs.exists(new Path("/spark/data/words/wordCount"))) {

fs.delete(new Path("/spark/data/words/wordCount"), true)

}

// 5、将结果进行保存
words.saveAsTextFile("/spark/data/words/wordCount")
sc.stop()

}

}

'''