Scala2.11.8 spark2.3.1 mongodb connector 2.3.0

时间:2023-03-09 14:26:28
Scala2.11.8  spark2.3.1  mongodb connector 2.3.0
import java.sql.DriverManager
import com.mongodb.spark._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object mongospark20180830consume_amount { // 关于记录多个相同字段的处理方法 https://blog.csdn.net/qq_14950717/article/details/62425563
// https://blog.csdn.net/qq_27234661/article/details/78344435?locationNum=3&fps=1
def main(args: Array[String]): Unit = { // val mgohost = "dds-m5e6e56a3b0cf7b42784-pub.mongodb.rds.aliyuncs.com"
// spark-submit --driver-class-path /usr/local/jdk/lib/mysql-connector-java-5.1.46.jar --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0 --class "mongospark20180830consume_amount" /testdata/u3.jar // "org.mongodb.spark" %% "mongo-spark-connector" % "2.3.0",
// val conn = DriverManager.getConnection(url)
val conf = new SparkConf().setAppName("appName").setMaster("local")
val sparkConf = new SparkConf().setAppName("adver").setMaster("local[*]")
val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val inputUri="mongodb://saas:saas2018yundinglixin@dds-m5e6e56a3b0cf7b42784-pub.mongodb.rds.aliyuncs.com:3717/saas.elemeterPowerHistory" val df=spark.read.format("com.mongodb.spark.sql").options(
Map("spark.mongodb.input.uri" -> inputUri,
"spark.mongodb.input.partitioner" -> "MongoPaginateBySizePartitioner",
"spark.mongodb.input.partitionerOptions.partitionKey" -> "_id",
"spark.mongodb.input.partitionerOptions.partitionSizeMB"-> ""))
.load() val currentTimestamp = System.currentTimeMillis()
val df2 = df.select("time".toString,"uuid".toString,"consume_amount".toString,"room_id".toString)
.toDF("time","uuid","consume_amount","room_id") spark.sql("use saas")
df2.write.mode("overwrite").saveAsTable("consume_amount20180831")
// df2.foreach(println)
// // val rddf=spark.sql( "select uuid,from_unixtime(cast(`time`/1000 AS bigint),'yyyyMMddHH'),consume_amount from consume where time>=1533115788000").toDF("uuid", "time","consume_amount")
//
// spark.sql("use saas")
//
// rddf.write.saveAsTable("consume_amount20180830")
// // val p=df.printSchema()
//
// // val select=spark.sql("select s.sn,s.uuid,e.time,e.consume_amount from staonly2 s join elem e on s.uuid=e.uuid").take(10)
// val select=spark.sql("select consume_amount from elem limit 5").take(5)
// select.foreach(println) }
}