Spark:读取mysql数据作为DataFrame

时间:2023-12-17 13:42:14

在日常工作中,有时候需要读取mysql的数据作为DataFrame数据源进行后期的Spark处理,Spark自带了一些方法供我们使用,读取mysql我们可以直接使用表的结构信息,而不需要自己再去定义每个字段信息。
下面是我的实现方式。

1.mysql的信息:

mysql的信息我保存在了外部的配置文件,这样方便后续的配置添加。

 mysql的信息我保存在了外部的配置文件,这样方便后续的配置添加。
//配置文件示例:
[hdfs@iptve2e03 tmp_lillcol]$ cat job.properties
#mysql数据库配置
mysql.driver=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
mysql.username=user
mysql.password=123456

2.需要的jar依赖

sbt版本,maven的对应修改即可

 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"

3.完整实现代码

 import java.io.FileInputStream
import java.util.Properties import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext} /**
* @author Administrator
* 2018/10/16-9:18
*
*/
object TestReadMysql {
var hdfsPath: String = ""
var proPath: String = ""
var DATE: String = "" val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
val sc: SparkContext = new SparkContext(sparkConf)
val sqlContext: SQLContext = new HiveContext(sc) def main(args: Array[String]): Unit = {
hdfsPath = args(0)
proPath = args(1)
//不过滤读取
val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
dim_sys_city_dict.show(10) //过滤读取
val dim_sys_city_dict1: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", s"city_id=240", proPath)
dim_sys_city_dict1.show(10)
} /**
* 获取 Mysql 表的数据
*
* @param sqlContext
* @param tableName 读取Mysql表的名字
* @param proPath 配置文件的路径
* @return 返回 Mysql 表的 DataFrame
*/
def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
val properties: Properties = getProPerties(proPath)
sqlContext
.read
.format("jdbc")
.option("url", properties.getProperty("mysql.url"))
.option("driver", properties.getProperty("mysql.driver"))
.option("user", properties.getProperty("mysql.username"))
.option("password", properties.getProperty("mysql.password"))
// .option("dbtable", tableName.toUpperCase)
.option("dbtable", tableName)
.load() } /**
* 获取 Mysql 表的数据 添加过滤条件
*
* @param sqlContext
* @param table 读取Mysql表的名字
* @param filterCondition 过滤条件
* @param proPath 配置文件的路径
* @return 返回 Mysql 表的 DataFrame
*/
def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String) = {
val properties: Properties = getProPerties(proPath)
var tableName = ""
tableName = "(select * from " + table + " where " + filterCondition + " ) as t1"
sqlContext
.read
.format("jdbc")
.option("url", properties.getProperty("mysql.url"))
.option("driver", properties.getProperty("mysql.driver"))
.option("user", properties.getProperty("mysql.username"))
.option("password", properties.getProperty("mysql.password"))
.option("dbtable", tableName)
.load()
} /**
* 获取配置文件
*
* @param proPath
* @return
*/
def getProPerties(proPath: String) = {
val properties: Properties = new Properties()
properties.load(new FileInputStream(proPath))
properties
} }

4.测试

 def main(args: Array[String]): Unit = {
hdfsPath = args(0)
proPath = args(1)
//不过滤读取
val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
dim_sys_city_dict.show(10) //过滤读取
val dim_sys_city_dict1: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", s"city_id=240", proPath)
dim_sys_city_dict1.show(10)
}

5.运行结果

数据因为保密原因进行了处理

  // 不过滤读取结果
+-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
|dict_id|city_id|city_name|city_code|group_id|group_name|area_code| bureau_id|sort|bureau_name|
+-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
| 1| 249| **| **_ab| 100| **按时| **-查到|xcaasd...| 21| 张三公司|
| 2| 240| **| **_ab| 300| **按时| **-查到|xcaasd...| 21| 张三公司|
| 3| 240| **| **_ab| 100| **按时| **-查到|xcaasd...| 21| 张三公司|
| 4| 242| **| **_ab| 300| **按时| **-查到|xcaasd...| 01| 张三公司|
| 5| 246| **| **_ab| 100| **按时| **-查到|xcaasd...| 01| 张三公司|
| 6| 246| **| **_ab| 300| **按时| **-查到|xcaasd...| 01| 张三公司|
| 7| 248| **| **_ab| 200| **按时| **-查到|xcaasd...| 01| 张三公司|
| 8| 242| **| **_ab| 400| **按时| **-查到|xcaasd...| 01| 张三公司|
| 9| 247| **| **_ab| 200| **按时| **-查到|xcaasd...| 01| 张三公司|
| 0| 243| **| **_ab| 400| **按时| **-查到|xcaasd...| 01| 张三公司|
+-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+ // 过滤读取结果
+-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
|dict_id|city_id|city_name|city_code|group_id|group_name|area_code| bureau_id|sort|bureau_name|
+-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
| 2| 240| **| **_JM| 300| **按时| **-查到|xcaasd...| 21| 张三公司|
| 3| 240| **| **_ZS| 100| **按时| **-查到|xcaasd...| 21| 张三公司|
| 6| 240| **| **_JY| 400| **按时| **-查到|xcaasd...| 01| 张三公司|
+-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+

6.总结

读取mysql其实不难,就是一些参数的配置而已。
在此处记录下。

本文章为工作日常总结,转载请标明出处!!!!!!!