Spark连接MongoDB之Scala

时间:2023-03-09 03:35:31
Spark连接MongoDB之Scala

MongoDB Connector for Spark
  Spark Connector Scala Guide

spark-shell --jars "mongo-spark-connector_2.11-2.0.0.jar,mongo-hadoop-core-2.0.2.jar,mongo-java-driver-3.4.2.jar"

import org.apache.spark.sql.SparkSession
import com.mongodb.spark._
import com.mongodb.spark.config._
import org.bson.Document val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnector")
.config("spark.some.config.option", "some-value")
.getOrCreate() val uri = "mongodb://172.1.1.1:27017" val userDF = spark.sql("""
select
uid,
name,
current_date() version
from test_table
limit 100
""").repartition(8) // Write to MongoDB
userDF.write.mode("overwrite").format("com.mongodb.spark.sql").options(
Map(
"uri" -> uri,
"database" -> "test",
"collection" -> "test_table")).save() // Read From MongoDB
val df = spark.read.format("com.mongodb.spark.sql").options(
Map(
"uri" -> uri,
"database" -> "test",
"collection" -> "test_table")).load() // 其他方式
userDF.write.mode("overwrite").format("com.mongodb.spark.sql").options(
Map(
"spark.mongodb.input.uri" -> uri,
"spark.mongodb.output.uri" -> uri,
"spark.mongodb.output.database" -> "test",
"spark.mongodb.output.collection" -> "test_table")).save() MongoSpark.save(
userDF.write.mode("overwrite").options(
Map(
"spark.mongodb.input.uri" -> uri,
"spark.mongodb.output.uri" -> uri,
"spark.mongodb.output.database" -> "test",
"spark.mongodb.output.collection" -> "test_table"))) MongoSpark.save(
userDF.write.mode("overwrite").options(
Map(
"uri" -> uri,
"database" -> "test",
"collection" -> "test_table"))) spark.stop()