spark基础知识学习第五天(适合新手学习)

时间:2023-01-25 15:40:42

Shark

Hive sql -> mr

Shark sql ->Spark core

 

Spark2.0之前的版本的Spark-SQL并不支持开窗函数和子查询的

 

1.Spark SQL1.6.x特点:

(1).内存列存储(不是按照对象存储的),面向列的存储方式(减少对内存的消耗)

(2).字节码生成技术(动态字节码生成技术)

(3).代码编写优化

----------------------------------------

1.易整合

2.统一的数据访问方式

3.兼容Hive

4.标准的数据连接

 

开窗函数:与聚合函数一样,开窗函数也是对行集组进行聚合计算,但是普通聚合函数每组只能返回一个值,而开窗函数可以每组返回多个值。

开窗函数的调用格式为:

        函数名(列)over(选项)

over关键字表示把函数当成开窗函数而不是聚合函数,SQL标准允许将所有聚合函数用做开窗函数,使用over关键字来区分这两种用法。

-------------------------------------------------------------------------------------------------------        

2.Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame

并且作为分布式SQL查询引擎的作用。

 

----------------------------------------------------------------------------------------

 

3.为什么要学习Spark SQL

我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,

大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢

。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

--------------------------------------------------------------------------------------

4.什么是DataFrames

与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,

除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持

嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的

是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的

DataFrame类似,SparkDataFrame很好地继承了传统单机数据分析的开发体验。

-------------------------------------------------------------------------------------------

 

5.生成DataFrame:

DSL:

val seq =Seq(("1","xiaofeng",18),("2","yaoyao",24),("1","xueyan",22)) //创建一个序列集合

val rdd1 = sc.parallelize(seq)                                               //将集合生成RDD

val df =rdd1.toDF("id","name","age")                                //通过RDD创建dateframe

df.show                //相当于action

 

//查询单个字段

valres1 = df.select("name")

res1.show

 

//查询多个字段

df.select("name","age").show

 

//当条件查询

df.select("name","age").filter(col("age")>18).show

---------------------------------------------------------------------------------        

SQL:

//注册二维表

df.registerTempTable("t_person")                  //注册二维表

sqlContext.sql("select name,age fromt_person").show             

//使用sqlContext.sql可以使用sql语句

sqlContext.sql("selectname,age from t_person where age >= 18 limit 2").show

 

读取hdfs的文件

val rdd1 =sc.textFile("hdfs://hadoop01:9000/person.txt")  //通过hdfs中的数据生成RDD

 

case classPerson(id:Long,name:String,age:Int,fv:Int)     //使用case class进行类型匹配

 

val PersonRDD = rdd1.map(_.split(",")).map(x =>Person(x(0).toLong,x(1)),x(2).toInt,x(3).toInt)

 

//让数据和类型一一对应

 

val personDF = personRDD.toDF()              //通过RDD创建dateframe

   

 

personDF.show         //查看生成的dateframe表格

 

personDF.select("name","age","fv")

 

personDF.registerTemTable("t_person")

 

sqlContext.sql("selectname ,age,fv from t_person where fv > 90").show

 

//查看DataFrame结构信息

personDF.printSchema

 

//查看表的信息(查询表结构)

sqlContext.sql("desc t_person").show

 

 

 

---------------------------------------------------------------------------------------------------                

 

案例通过反射推断Schema:

 

packagecom.qf.gp1704.day10

 

importorg.apache.spark.sql.{DataFrame, SQLContext}

importorg.apache.spark.{SparkConf, SparkContext}

 

/**

 * 通过反射推断Schema

  */

objectInferSchema {

  def main(args: Array[String]): Unit = {

 

valconf = newSparkConf().setAppName("InferSchema").setMaster("local")

valsc = new SparkContext(conf)

valsqlContext = new SQLContext(sc)

 

vallinesRDD =sc.textFile("hdfs://node01:9000/person.txt").map(_.split(","))

 

// 将RDD和Person关联

val personRDD = linesRDD.map(p => Person(p(0).toInt, p(1),p(2).toInt, p(3).toInt))

 

// 调用toDF方法需要引入隐式转换函数

import sqlContext.implicits._

 

// 将personRDD转换成DataFrame

valpersonDF: DataFrame = personRDD.toDF()

 

// 注册为一张临时二维表

personDF.registerTempTable("t_person")

 

valsql = "select * from t_person where fv > 80 order by age desc limit10"

 

// 调用sqlContext实例的sql方法进行查询

valres: DataFrame = sqlContext.sql(sql)

 

// 把结果集存储到HDFS,mode是怎么存储(之前写的数据一样)。以什么格式存储

res.write.mode("append").json("hdfs://node01:9000/out-20180418-1")

注意:2.0版本后添加了很多格式,比如csv格式,如果没有的格式可以通过添加第三方插件导入pom.xml来使用该格式

res.show()

 

sc.stop()

  }

}

 

case class Person(id: Int, name: String,age: Int, fv: Int)    //反射

----------------------------------------------------------------------------

 

通过StructType指定Schema

packagecom.qf.gp1704.day10

 

importorg.apache.spark.rdd.RDD

importorg.apache.spark.sql.{DataFrame, Row, SQLContext}

importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

importorg.apache.spark.{SparkConf, SparkContext}

 

/**

 * 通过StructType指定Schema

  */

objectStructTypeSchemaDemo {

  def main(args: Array[String]): Unit = {

 

valconf = newSparkConf().setAppName("StructTypeSchemaDemo").setMaster("local")

valsc = new SparkContext(conf)

valsQLContext = new SQLContext(sc)

 

vallinesRDD =sc.textFile("hdfs://node01:9000/person.txt").map(_.split(","))

 

// StructType指定Schema

val schema:StructType = StructType {

  Array(

StructField("id",IntegerType, false),

StructField("name",StringType, true),

StructField("age",IntegerType, true),

StructField("fv",IntegerType, true)

  )

}

 

// 映射,通过Row可以封装多个值,类似于元组封装不同类型的值

val rowRDD:RDD[Row] = linesRDD.map(p => Row(p(0).toInt, p(1), p(2).toInt, p(3).toInt))

 

// 转换成DataFrame

val personDF:DataFrame = sQLContext.createDataFrame(rowRDD, schema)

 

// 注册成临时表

personDF.registerTempTable("t_person")

 

valsql = "select * from t_person"

val res = sQLContext.sql(sql)

res.write.mode("append").json("c://person")

 

sc.stop()

  }

}

-------------------------------------------------------------------------------------------------------

案例:将数据导入到Mysql

packagecom.qf.gp1704.day10

 

importjava.util.Properties

 

importorg.apache.spark.{SparkConf, SparkContext}

importorg.apache.spark.sql.{Row, SQLContext}

importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

 

objectInsertData2MySQL {

  def main(args: Array[String]): Unit = {

valconf = newSparkConf().setAppName("InsertData2MySQL")//.setMaster("local")

valsc = new SparkContext(conf)

valsQLContext = new SQLContext(sc)

 

vallinesRDD =sc.textFile("hdfs://node01:9000/test.txt").map(_.split(","))

 

val schema = StructType{Array(

 StructField("name", StringType, true),

  StructField("age",IntegerType, true)

)}

 

val personRDD = linesRDD.map(p => Row(p(0), p(1).toInt))

 

val personDF = sQLContext.createDataFrame(personRDD, schema)

 

//请求MySQL的一些配置信息

val prop = newProperties()

prop.put("user","root")

prop.put("password","root")

prop.put("driver","com.mysql.jdbc.Driver")

 

val url ="jdbc:mysql://node03:3306/bigdata"

 

val table ="person"

 

//把数据写入MySQL,使用write方法并通过追加方式以jdbc连接的方式将数据

//存入MySQL中,jdbc中的三个参数分别是,url,表名,配置信息

personDF.write.mode("append").jdbc(url,table, prop)

 

sc.stop()

  }

}

 

 

------------------------------------------------------------------------------------------------------------------

 

6.JDBC

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,

还可以将数据再写回关系型数据库中。

(1).从MySQL中加载数据(Spark Shell方式)

1.启动Spark Shell,必须指定mysql连接驱动jar包

/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-shell\

--masterspark://node01:7077 \

--jars/usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

--driver-class-path/usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar

//这里需要加载两次jar包,第一次是需要jar包,第二次是需要驱动类

 

2.mysql中加载数据,生成DataFrame,以jdbc的方式读取

valjdbcDF = sqlContext.read.format("jdbc").options(Map("url"-> "jdbc:mysql://node03:3306/bigdata", "driver" ->"com.mysql.jdbc.Driver", "dbtable" ->"person", "user" -> "root","password" -> "root")).load()

 

3.执行查询

jdbcDF.show()              //show方法底层调用的还是println方法

-----------------------------------------------------------------------------------------------------------------------

4.将数据写入到MySQL中(打jar包方式)

1.编写Spark SQL程序

packagecom.qf.spark.sql

 

importjava.util.Properties

importorg.apache.spark.sql.{SQLContext, Row}

importorg.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}

importorg.apache.spark.{SparkConf, SparkContext}

 

objectJdbcRDD {

  def main(args: Array[String]) {

valconf = new SparkConf().setAppName("MySQL-Demo")

valsc = new SparkContext(conf)

valsqlContext = new SQLContext(sc)

//通过并行化创建RDD

valpersonRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3","3 kitty 6")).map(_.split(" "))

//通过StructType直接指定每个字段的schema

valschema = StructType(

  List(

StructField("id",IntegerType, true),

StructField("name",StringType, true),

StructField("age",IntegerType, true)

  )

)

 

//将RDD映射到rowRDD

valrowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))

 

//将schema信息应用到rowRDD上

valpersonDataFrame = sqlContext.createDataFrame(rowRDD, schema)

 

//创建Properties存储数据库相关属性

valprop = new Properties()

prop.put("user","root")

prop.put("password","123456")

 

//将数据追加到数据库

personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.10.1:3306/bigdata","bigdata.person", prop)

 

//停止SparkContext

sc.stop()

  }

}

 

 

2.用maven将程序打包

 

3.将Jar包提交到spark集群

/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit\

--classcom.qf.spark.sql.JdbcRDD \

--masterspark://node01:7077 \

--jars/usr/local/spark-1.6.1bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

--driver-class-path/usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

/root/spark-mvn-1.0-SNAPSHOT.jar

 

 

 

-------------------------------------------------------------------------------------------------------------------------        

6.Hive-On-Spark

在SparkSQL之后出现的,主要是和Hive-On-Tez进行竞争

 

配置:

(1).将Hadoop的/etc/hadoop下的配置文件core-site.xml复制到Spark的conf目录

//主要获取hdfs上的数据

(2).将Hive的conf目录下的hive-site.xml复制到Spark的conf目录                        

//获取元数据信息

 

启动:./spark-sql --master spark://hadoop01:7077 --

 

-----------------------------------------------------------------------------------------------------

 

7.Kafka(消息中间件)(开源消息系统)----scala写成---相当于微信公众号

传递生产者和消费者之间的信息的桥梁,一般用来缓存数据

是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。

 

默认存168小时(一周)

 

-------------------------------------------------------------------------------------------------------------------

 

 

8.JMS的基础

(1).JMS是什么:JMSJava提供的一套技术规范

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,

用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,

绝大多数MOM提供商都对JMS提供支持。

 

(2).JMS干什么用:用来异构系统 集成通信,缓解系统瓶颈,提高系统的伸缩性增强系统用户体验,使得系统模块化和组件化变得可行并更加灵活

 

(3).JMS消息传输模型

l        点对点模式(一对一,消费者主动拉取数据pull,消息收到后消息清除)

点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。

l        发布/订阅模式(一对多,数据生产后,推送给所有订阅者push

发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即当前订阅者不可用,处于离线状态。

 

(4).JMS核心组件

l        Destination:消息发送的目的地,也就是前面说的Queue和Topic。

l        Message :从字面上就可以看出是被发送的消息。

l        Producer: 消息的生产者,要发送一个消息,必须通过这个生产者来发送。

l        MessageConsumer: 与生产者相对应,这是消息的消费者或接收者,通过它来接收一个消息。

 

(5).常见的类JMS消息服务器:

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1J2EE 1.4规范的

分布式消息中间件 Metamorphosis:

Metamorphosis (MetaQ) 是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在淘宝和支付宝有着广泛的应用,现已开源。

分布式消息中间件 RocketMQ--- (MetaQ) 3.0版本

-----------------------------------------------------------------------------------------------------------------

 

9.为什么需要消息队列(重要、了解)

消息系统的核心作用就是三点:解耦,异步和并行

 

以用户注册的案列来说明消息系统的作用

-------------------------------------------------------------------------------------------------

 

10.Kafka核心组件(重要)

l        Topic :消息根据Topic进行归类

l        Producer:发送消息者

l        Consumer:消息接受者

l        broker:每个kafka实例(server)

l        Zookeeper:依赖集群保存meta信息。

 

 

生产者可以随意将数据传递给Kafka集群中的各个分区中

Kafka集群中将数据通过消息队列(先进先出,单向传递)存储数据

消费者集群向kafka集群请求数据

当消费者某一集群挂掉后会让其他集群拿到挂掉集群所要分配到的数据,会出现数据偏移问题

两个集群同时请求一个数据的时候,会出现线程等待问题

每个消费者集群都能随便拿到kafka中线程开启的数据

 

 

 

角色:

Producer集群:

1、生产者负责获取数据并把数据传到Kafka的,比如flume、logstash,

生产者会监控一个目录负责把数据采集到Kafka

2、生产者集群是由多个进程组成,一个生产者作为一个独立的进程

3、多个生产者发送的数据是可以存到同一个topic的同一个partition的

4、一个生产者的数据可以放到多个topic中

5、单个生产者就具有数据分发的能力

 

Kafka集群:

1、Kafka集群可以保存多种数据类型的数据,一种数据类型可以保存到一个topic

一个Kafka集群中可以创建多个topic

2、每个topic可以创建多个分区和多个副本,分区的数量和副本的数量是在创建topic时指定的后期也可以运行相应的命令更改分区数和副本数

3、每个分区的数据是由多个segment组成,一个segment文件里有多个index文件和对应的数据文件(.log)组成

4、一个topic的分区数据可以有多个备份(副本),原始数据和副本数据不可以在同一个节点上

 

Consumer集群:

1、消费者负责拉取数据,比如:SparkStreaming、Storm

2、一个ConsumerGroup称为Consumer集群

3、新增或减少Consumer成员时会触发Consumer集群的负载均衡

4、ConsumerGroup可以消费一个或多个分区的数据,相反,一个分区的数据同一时刻只能被一个Consumer来消费

5、Consumer成员之间 消费的数据各不相同,在同一个group中数据不可以重复消费

 

Producer的写入流程:

 

 

只有pull过程全部结束,每个副本都发送了ack后,这个数据才能是消费的,这是一整个过程,必须将数据传给leader,然后follower从leader中拉(pull)数据,然后写入到followers完成后向leader发送ack表示过程已经完成。leader收到所有副本的ack后向peoducer发送ack