Spark 数据源

时间:2023-03-09 23:29:26
Spark 数据源

一、mysql作为数据源

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
* mysql作为数据源
*
* schema信息
* root
* |-- uid: integer (nullable = false)
* |-- xueyuan: string (nullable = true)
* |-- number_one: string (nullable = true)
*/
object JdbcSource {
def main(args: Array[String]): Unit = {
//1.sparkSQL 创建sparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("JdbcSource")
.master("local[2]").getOrCreate() //2.加载数据源
val urlData: DataFrame = sparkSession.read.format("jdbc").options(Map(
"url" -> "jdbc:mysql://localhost:3306/urlcount",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "url_data",
"user" -> "root",
"password" -> "root"
)).load() //测试
//urlData.printSchema()
//urlData.show() //3.过滤数据
val fData: Dataset[Row] = urlData.filter(x => {
//uid>2 如何拿到uid?
x.getAs[Int](0) > 2
}) fData.show()
sparkSession.stop()
}
}

mysql数据:

Spark 数据源

二、Spark写出数据格式

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object JdbcSource1 {
def main(args: Array[String]): Unit = {
//1.sparkSQL 创建sparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("JdbcSource")
.master("local[2]").getOrCreate() import sparkSession.implicits._
//2.加载数据源
val urlData: DataFrame = sparkSession.read.format("jdbc").options(Map(
"url" -> "jdbc:mysql://localhost:3306/urlcount",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "url_data",
"user" -> "root",
"password" -> "root"
)).load() //3.uid>2
val r = urlData.filter($"uid" > 2)
val rs: DataFrame = r.select($"xueyuan", $"number_one") //val rs: DataFrame = r.select($"xueyuan") //写入以text格式
//rs.write.text("e:/saveText") //写入以json格式
//rs.write.json("e:/saveJson") //写入以csv格式
rs.write.csv("e:/saveCsv") //rs.write.parquet("e:/savePar") rs.show()
sparkSession.stop()
}
}

三、Json作为数据源

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object JsonSource {
def main(args: Array[String]): Unit = {
//1.创建sparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("JsonSource")
.master("local[2]").getOrCreate() import sparkSession.implicits._
//2.读取json数据源
val jread: DataFrame = sparkSession.read.json("e:/saveJson") //3.处理数据
val fread: Dataset[Row] = jread.filter($"xueyuan" === "bigdata") //4.触发action
fread.show() //5.关闭资源
sparkSession.stop()
}
}

四、Csv作为数据源

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object CsvSource {
def main(args: Array[String]): Unit = {
//1.创建sparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("CsvSource")
.master("local[2]").getOrCreate() import sparkSession.implicits._
//2.读取csv数据源
val cread: DataFrame = sparkSession.read.csv("e:/saveCsv") //3.处理数据
val rdf = cread.toDF("id", "xueyuan")
val rs = rdf.filter($"id" <= 3) //4.触发action
rs.show() //5.关闭资源
sparkSession.stop()
}
}