spark快速大数据分析学习笔记

时间:2023-02-01 17:19:40

hadoop环境配置:

系统变量:新建变量HADOOP_HOME,值编辑为D:\sowt\hadoop
Administrator的用户变量:在PATH中添加bin文件夹位置D:\sowt\hadoop\bin


 报错排除

运行:

val lines = sc.textFile("README.md")//打开spark文件夹中的README.md文件

lines.count()//计算文件中的行数

报错:

org.apache.hadoop.mapred.InvalidInputException:Input path does not exist:file:/C:Users/Administrator/README.md

原因:

WIN+R输入spark-shell是在C:Users/Administrator打开spark-shell,这个文件夹下当然没有README.md

解决:

打开spark安装目录,shfit+右键,点击在此处打开命令行,输入spark-shell


 概念

运行:

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

val conf = new SparkConf().setMaster("local").setAppName("My App")

val sc = new.SparkContext(conf)

最最重要的内容是SparkContext

SparkContext的基本用法,传递两个参数:

集群URL:.setMaster(),告诉Spark如何连接到集群上,使用local可以让Spark在本地单机单线程运行

应用名:.setAppName(),为Spark应用起名,当连接到集群时,可以使用这个名字在集群管理器的用户界面找到自己的应用

 初始化SparkContext后,就可以创建RDD来进行操控

关闭Spark可以调用SparkContext的stop()方法,或者System.exit(0)或者sys.exit()退出应用


 RDD是一个不可变的分布式对象集合,每个RDD被分为多个分区,这些分区运行在集群中的不同节点上。

RDD创建:

val lines = sc.textFile("README.md")

RDD转化生成新的RDD,返回结果是一个RDD,RDD行动操作返回的是其他的数据类型。

转化操作:

val inputRDD = sc.textFile("log.txt")

val errorsRDD = inputRDD.filter(line => line.contains("error"))


 

map()、filter()、flatMap()的区别

map()接收一个函数如(x => x*x),将这个函数应用于每个元素,生成一个新的RDD,并将函数反回结果作为新RDD中的元素,两个RDD中包含元素数量不变

filter()接收一个函数如(x!=1),将这个函数应用于每个元素,生成一个新的RDD,并讲满足该函数的结果返回作为新RDD中的元素,两个RDD中包含元素的数量可能不同

flatMap()接收一个函数如(x => x.split(" ")),会将这个函数应用于每个元素,生成一个新的RDD,将函数返回结果中的内容作为新RDD中的元素,新的RDD中包含元素的数量可能较多


 

数据混洗(shuffle):对网络中的所有RDD取并集(RDD具有集合的一些特性,但不是严格意义上的集合),以确保每个元素只有一份。

常见的伪集合操作会导致数据混洗:distinct(),union(),intersection(),sbutract(),caresian(),


 

使用SparkSQL从ORACLE中获取数据

更多细节可以参考官方文档http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.package

1、使用interllij IDEA编程,新建scala object,在新建的object中新建main函数,所有程序均放置在main函数中

2、新建一个本地spark任务

val spark = SparkSession
.builder()
.master("local[*]")//表示在本地运行spark任务
.appName("Spark SQL basic example")//定义spark任务名称
.config("spark.some.config.option", "some-value")
.getOrCreate()//获取一个已有的sparksession,如果没有则新建一个
val jdbcDF = spark.read//返回一个具有format、option方法的DataFrameReader类型
.format("jdbc")
.option("url", "jdbc:oracle:thin:@***.**.**.**:1521:orcl")//远程连接数据库的地址与端口
.option("dbtable", "sys_dept")//需要获取数据的表名
.option("user", "cougaerptest")//数据库用户账号
.option("driver","oracle.jdbc.driver.OracleDriver")//从oracle中读取数据,需要加载oracle驱动
.option("password", "******")//数据库密码
.load()//从指定的数据库关系表中加载数据