dataframe的进行json数据的压平、增加一列的id自增列

时间:2023-03-08 16:49:31
dataframe的进行json数据的压平、增加一列的id自增列
{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
{"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}
{"name":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}
{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
object explodeTest {
def main(args: Array[String]): Unit = { val sparks = SparkSession.builder.master("local[4]").appName("test1").getOrCreate
val sc = sparks.sparkContext val df= sparks.read.json("file:///C:\\Users\\imp\\Desktop\\bo-kong\\data\\josn") df.show()
//spark 读取json 数据
/**+---+--------------------+-------+
|age| myScore| name|
+---+--------------------+-------+
| 25| [[19,23], [58,50]]|Michael|
| 30|[[29,33], [38,52]...| Andy|
| 19| [[39,43], [28,53]]| Justin|
| 25| [[19,23], [58,50]]|Michael|
| 30|[[29,33], [38,52]...| Andy|
| 19| [[39,43], [28,53]]| Justin|
| 25| [[19,23], [58,50]]|Michael|
| 30|[[29,33], [38,52]...| Andy|
| 19| [[39,43], [28,53]]| Justin|
+---+--------------------+-------+
*
*
*
*/ //使用spark.sql.functions._ explode函数进行压平操作 行转列
import org.apache.spark.sql.functions._
val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
dfScore.show()
df.createOrReplaceTempView("df")
//u.answer, ''
/**
*
*
*
* +-------+-------+
* | name|myScore|
* +-------+-------+
* |Michael|[19,23]|
* |Michael|[58,50]|
* | Andy|[29,33]|
* | Andy|[38,52]|
* | Andy|[88,71]|
* | Justin|[39,43]|
* | Justin|[28,53]|
* |Michael|[19,23]|
* |Michael|[58,50]|
* | Andy|[29,33]|
* | Andy|[38,52]|
* | Andy|[88,71]|
* | Justin|[39,43]|
* | Justin|[28,53]|
* |Michael|[19,23]|
* |Michael|[58,50]|
* | Andy|[29,33]|
* | Andy|[38,52]|
* | Andy|[88,71]|
* | Justin|[39,43]|
* +-------+-------+
* only showing top 20 rows
*/ }
}

 
数据
aa
bb
cc
dd
ee
ff

dataframe增加index主键列

 case  class Log(map:scala.collection.mutable.Map[String,String],ID: Long)
import sparks.implicits._
val data2 = sc.parallelize(Seq((Map("uuid"->"sxexx","ip"->"192.168")),Map("uuid"->"man","ip"->"192.168.10.1"))).zipWithIndex()
.map(i=>(i._1,i._2))
data2.collect().foreach(print(_))
/**
* 先创造一个Rdd[map] 使用zipWithIndex 看看效果 第二个元素为id主键
*
*
* (Map(uuid -> sxexx, ip -> 192.168),0)
* (Map(uuid -> man, ip -> 192.168.10.1),1)
*/ val data= sc.textFile("file:///C:\\Users\\imp\\Desktop\\bo-kong\\data\\data")
.zipWithIndex().toDF("id","value")
data.show() /**
* 使用上面的数据的得出结果
* +---+-----+
* | id|value|
* +---+-----+
* | aa| 0|
* | bb| 1|
* | cc| 2|
* | dd| 3|
* | ee| 4|
* | ff| 5|
* +---+-----+
*/