大数据入门到精通11-spark dataframe 基础操作

时间:2023-03-08 21:19:50
大数据入门到精通11-spark dataframe 基础操作

// dataframe is the topic

一、获得基础数据。先通过rdd的方式获得数据

val ny= sc.textFile("data/new_york/")
val header=ny.first
val filterNY =ny.filter(listing=>{
listing.split(",").size==14 && listing!=header
})

//因为后面多是按照表格的形式来处理dataframe,所以这里增加一个size==14的限制非常有必要。要求数据整齐划一。
val nyMap= filterNY.map(listing=>{
val listingInfo=listing.split(",")
(listingInfo(0).toInt,listingInfo(2),listingInfo(9).toFloat,listingInfo(4))

})

//这里的map并没有采用key val的形式,而是四个字段并列的map格式,这种形式更加适合后面转换成dataframe,原来key value的形式,主要在groupbykey,countbykey,reducebykey的rdd操作的时候才有用。
nyMap.take(20).foreach(println)

二、把rdd转化成dataframe

val nyDF=nyMap.toDF("Room_ID","Room_Type","Price","Neighborhood")

//转化的关键步骤

三、dataframe上的关键常用操作

nyDF.show
//default it will be show 20 rows .But you can specificate row number.eg
nyDF.show(40)

//show函数可以指定行数。
nyDF.select("Room_ID","Room_Type","Price").show
//you can also specificate a row to select a special column.
val countsDF= nyDF.filter("Price< 100.0").groupBy("Room_Type").count()

//这里重点讲一下dataframe 的 groupby 出来的是一个RelationalGroupedDataset 类型的dataset

scala> nyDF.filter("Price< 100.0").groupBy("Room_Type")
res12: org.apache.spark.sql.RelationalGroupedDataset = org.apache.spark.sql.RelationalGroupedDataset@63a4356b

//所有的dataframe的聚合函数都要先groupby 然后在这个基础上再count,等聚合函数。

四、常见dataframe上的聚合函数

val averagePrice=nyDF.filter("Room_Type='Entire home/apt'").groupBy("Neighborhood").
agg(avg("Price"),max("Price"),count("Price"))

averagePrice.show
val averageTypePrice=nyDF.groupBy("Neighborhood","Room_Type").
agg(avg("Price"),max("Price"),count("Price"))

上面两个例子可以看出通过agg函数,然后里面放各种聚合函数。形成新的聚合dataframe列名就是avg("Price")等等

第二个方面groupby也可以根据两个或者多个字段groupby

五、dataframe也有take函数。

dataframe每一行是是一个row类型。take得到的是一个row的数组

scala> averageTypePrice.take(10)
res16: Array[org.apache.spark.sql.Row] = Array([Battery Park City,Entire home/apt,340.9132029339853,9150.0,1636], [Upper West Side,Shared room,137.98664440734558,9900.0,1198], [Coney Island,Private room,73.0,250.0,97], [Bronx Park,Entire home/apt,153.5,865.0,22], [Bronxdale,Shared room,32.5,50.0,8], [Port Morris,Shared room,61.0,62.0,2], [Morris Heights,Entire home/apt,125.0,125.0,1], [Battery Park City,Private room,135.51234567901236,2800.0,810], [Van Cortlandt Park,Private room,61.55,112.0,40], [Unionport,Private room,63.793103448275865,99.0,29])

scala>

六、dataframe也可以sort函数,注意不是sortby

averageTypePrice.sort("Neighborhood").show

averageTypePrice.sort(desc("avg(Price)")).show

这里可以降序排列,默认是升序排列,另外聚合的列名是avg(Price)  不是avg(“Price”)

从show的列名也可以看出来。

七、自定义函数

val finalDf=averagePrice.withColumn("addCol",roundfun(averagePrice("avg(Price)")))

withColum是增加一列的意思。自定义函数的入参是dataframe的一列

val finalDf2=finalDf.drop("avg(Price)").sort(desc("addCol")).show

增加一列对应的是删除一列,使用drop函数。

八、转化为RDD以及类型的处理

val finalRDD=finalDf.rdd

注意val finalRDD=finalDf2.rdd会报错,上面的finalDf2严格来说不是dataframe。finalDf才是一个dataframe

scala> finalRDD.take(1)
res32: Array[org.apache.spark.sql.Row] = Array([Corona,120.56349206349206,1350.0,126,120.0])

scala> nyMap.take(1)
res33: Array[(Int, String, Float, String)] = Array((105,Private room,167.0,Hell's Kitchen))

发现通过dataframe转化过来的rdd,和普通rdd比较。里面没有每个一列的类型,只有一个单体类型row。所以获取里面元素的方法也有变化

九、dataframe转化过来的rdd的类型处理

scala> val row=finalRDD.take(1)
row: Array[org.apache.spark.sql.Row] = Array([Corona,120.56349206349206,1350.0,126,120.0])

scala> row(0)(0)
res34: Any = Corona

scala> row(0)(1)
res35: Any = 120.56349206349206

scala> row(0)(2)
res36: Any = 1350.0

这个any类型如果要转化成想要的类型,要先toString 然后再toInt等等

写一个map来处理:

val finalStandardRdd=finalRDD.map(row=>{
(row(0).toString,row(1).toString.toFloat,row(2).toString.toFloat,row(3).toString.toInt,row(4).toString.toFloat)
})

scala> finalStandardRdd
res38: org.apache.spark.rdd.RDD[(String, Float, Float, Int, Float)] = MapPartitionsRDD[85] at map at <console>:44

发现类型已经正常。