大数据入门到精通8-spark RDD 复合key 和复合value 的map reduce操作

时间:2023-01-15 13:45:46

一.做基础数据准备

这次使用fights得数据。

scala> val flights= sc.textFile("/user/hdfs/data/Flights/flights.csv")
flights: org.apache.spark.rdd.RDD[String] = /user/hdfs/data/Flights/flights.csv MapPartitionsRDD[3] at textFile at <console>:24

scala> val sampleFlights= sc.parallelize(flights.take(1000))
sampleFlights: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:26

scala> val header= sampleFlights.first
header: String = YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY

scala> val filteredFlights= flights.filter( line=>{ line!= header } )
filteredFlights: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:30

二.计算复合key 和 value

计算礼拜几,根据起飞时间计算是上午,下午,晚上,还是夜间飞机,把这两个作为复合key,根据这个来统计平均延误时间。

val timingMap = filteredFlights.map(flight =>{
val flightList=flight.split(",")
val dayOfWeek = flightList(3)
val time=if (flightList(10).length>0) {flightList(10).toInt}else 0
val delay=if (flightList(22).length>0) {flightList(22).toInt}else 0

var periodOfDay =0

if(time>=600 && time<1200){
periodOfDay=0
}else if (time>=1200 && time<1800){
periodOfDay=1
}else if (time>=1800 && time<2400){
periodOfDay=2
}else if (time>=0 && time<600){
periodOfDay=3
}
((dayOfWeek,periodOfDay),(delay,1))
})

timingMap.take(30).foreach(println)

//这里有一个重点,periodOfDay 不能定义为val,否则会有重复赋值得错误,如果有重复赋值得必要,使用var来定义。

//根据起飞时间分成1.2,3,4

//计算reduce 根据复合key ,计算延迟,如果在30分钟以内延迟到达,不计入延迟

val reduceMap=timingMap.reduceByKey((sum,current)=>{
var output =(0,0)
if (current._1>30){
output=((sum._1+current._1),(sum._2+current._2))
}else {output=(sum._1,sum._2)}
if (sum._1<0){
output=(0,0)
}
output
})

reduceMap.take(30).foreach(println)

//这里实际操作中把current._2写成1,因为实际上这个数据其实就是1,但是发现如果写成1,每次的结果都不一样,这里还是必须要使用current._2

三、排序并求平均延迟

val sortedDelays= reduceMap.sortByKey()

val delayByTime = sortedDelays.map(rec=>{
val dayOfWeek =rec._1._1
val time= rec._1._2
val chance =(rec._2._1+0.0)/rec._2._2
var periodOfDay=""
if (time==0){
periodOfDay="Morning"
}else if (time==1){
periodOfDay="Afternoon"
}else if (time==2){
periodOfDay="Evening"
}else if (time==3){
periodOfDay="Night"
}

dayOfWeek+", "+periodOfDay+", "+chance

})

delayByTime.take(30).foreach(println)

大数据入门到精通8-spark RDD 复合key 和复合value 的map reduce操作的更多相关文章

  1. 大数据入门到精通5--spark 的 RDD 的 reduce方法使用

    培训系列5--spark 的 RDD 的 reduce方法使用 1.spark-shell环境下准备数据 val collegesRdd= sc.textFile("/user/hdfs/C ...

  2. 大数据入门到精通4--spark的rdd的map使用方式

    学习了之前的rdd的filter以后,这次来讲spark的map方式 1.获得文件 val collegesRdd= sc.textFile("/user/hdfs/CollegeNavig ...

  3. 大数据入门到精通2--spark rdd 获得数据的三种方法

    通过hdfs或者spark用户登录操作系统,执行spark-shell spark-shell 也可以带参数,这样就覆盖了默认得参数 spark-shell --master yarn --num-e ...

  4. 大数据入门到精通3-SPARK RDD filter 以及 filter 函数

    一.如何处理RDD的filter 1. 把第一行的行头去掉 scala> val collegesRdd= sc.textFile("/user/hdfs/CollegeNavigat ...

  5. 大数据入门到精通11-spark dataframe 基础操作

    // dataframe is the topic 一.获得基础数据.先通过rdd的方式获得数据 val ny= sc.textFile("data/new_york/")val ...

  6. 大数据入门到精通10--spark rdd groupbykey的使用

    //groupbykey 一.准备数据val flights=sc.textFile("data/Flights/flights.csv")val sampleFlights=sc ...

  7. 大数据入门到精通6---spark rdd reduce by key 的使用方法

    1.前期数据准备(同之前的章节) val collegesRdd= sc.textFile("/user/hdfs/CollegeNavigator.csv")val header ...

  8. 大数据入门到精通18--sqoop 导入关系库到hdfs中和hive表中

    一,选择数据库,这里使用标准mysql sakila数据库 mysql -u root -D sakila -p 二.首先尝试把表中的数据导入到hdfs文件中,这样后续就可以使用spark来dataf ...

  9. 大数据入门到精通13--为后续和MySQL数据库准备

    We will be using the sakila database extensively inside the rest of the course and it would be great ...

随机推荐

  1. Web开发入门疑问收集&lpar;不定期更新&rpar;

    bootstrap container和container-fluid的区别 原始链接 container 根据显示设备满足的最小宽度,来决定实际内容宽度,是一个根据设置内容阶梯式响应的布局. 例子: ...

  2. 一个CSS中Z-index的用法

    一个CSS中Z-index的用法 CSS教程:彻底掌握Z-index属性     大多数的CSS属性都很容易使用.常常,当您对标记语言的元素使用CSS属性时,产生的结果会随着您刷新页面而立即呈现.而另 ...

  3. android&colon;layout&lowbar;weight的真实含义

    首先声明只有在Linearlayout中,该属性才有效.之所以android:layout_weight会引起争议, 是因为在设置该属性的同时,设置android:layout_width为wrap_ ...

  4. oracle 消除块竞争&lpar;hot blocks&rpar;

    上篇日志提到了,那么高的负载,是存在数据块读竞争,下面介绍几个方法来消除块竟争 查找块竟争 SELECT p1 "file#", p2 "block#", p3 ...

  5. Python学习笔记--Python字符串连接方法总结

    声明: 这些总结的学习笔记,一部分是自己在工作学习中总结,一部分是收集网络中的知识点总结而成的,但不到原文链接.如果有侵权,请知会,多谢. python中有很多字符串连接方式,总结一下: 1)最原始的 ...

  6. hadoop笔记之Hive的数据存储&lpar;内部表&rpar;

    Hive的数据存储(内部表) Hive的数据存储(内部表) 基于HDFS 可使用hadoop给我们提供的web管理工具查看数据.打开管理工具localhost:9000–>Utilities下的 ...

  7. ajax接受json响应(讲义)

    l 什么是json? l Json和xml比较 l Ajax如何使用JSON l Ajax接收json响应案例 什么是json? JSON (JavaScript Object Notation) 是 ...

  8. &lbrack;Swift&rsqb;LeetCode567&period; 字符串的排列 &vert; Permutation in String

    Given two strings s1 and s2, write a function to return true if s2 contains the permutation of s1. I ...

  9. Eclipse 中 Debug 调试 java 代码一直报 Source not found

    今天使用eclipse的debug调试代码,一直没法正常调试,一按F6就提示Source not found 根据提示发现可能是另一个项目影响了,所以把另一个项目Close Project,这次直接t ...

  10. Spring 核心API

    BeanFactory: 这是一个工厂,用于生产任意Bean,采用延迟加载,第一次getBean时才会加载 ApplicationContext: 是BeanFactory的一个子接口,功能更强大(国 ...