mr统计每年中每月温度的前三名

时间:2023-03-10 04:40:39
mr统计每年中每月温度的前三名

weatherMapper

package com.laoxiao.mr.weather;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date; import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; public class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{ SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
protected void map(Text key, Text value, Context context)
throws java.io.IOException ,InterruptedException {
try {
Date d = df.parse(key.toString());
Calendar c=Calendar.getInstance();
c.setTime(d);
int year=c.get(Calendar.YEAR);
int month=c.get(Calendar.MONTH);
double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));
context.write(new MyKey(year,month+1,hot), new DoubleWritable(hot));
} catch (Exception e) {
e.printStackTrace();
} }; }

weatherReducer

 package com.laoxiao.mr.weather;

 import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{
protected void reduce(MyKey arg0, java.lang.Iterable<DoubleWritable> arg1, Context arg2)
throws java.io.IOException ,InterruptedException {
int i=0;
for(DoubleWritable d:arg1){
i++;
arg2.write(new Text(arg0.getYear()+"\t"+arg0.getMonth()+"\t"+d.get()),NullWritable.get());
if(i==3){
break;
}
}
}; }

MyKey

package com.laoxiao.mr.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class MyKey implements WritableComparable<MyKey>{ private int year;
private int month;
private double hot;
public MyKey(int year, int month, double hot) {
super();
this.year = year;
this.month = month;
this.hot = hot;
}
public MyKey() {
// TODO Auto-generated constructor stub
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public double getHot() {
return hot;
}
public void setHot(double hot) {
this.hot = hot;
} public void readFields(DataInput arg0) throws IOException {
this.year=arg0.readInt();
this.month=arg0.readInt();
this.hot=arg0.readDouble();
}
public void write(DataOutput arg0) throws IOException {
arg0.writeInt(year);
arg0.writeInt(month);
arg0.writeDouble(hot);
} //判断对象是否是同一个对象,当该对象作为输出的key
public int compareTo(MyKey o) {
int r1 =Integer.compare(this.year, o.getYear());
if(r1==0){
int r2 =Integer.compare(this.month, o.getMonth());
if(r2==0){
return Double.compare(this.hot, o.getHot());
}else{
return r2;
}
}else{
return r1;
}
} }

MyPartitioner

package com.laoxiao.mr.weather;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{ //执行时间越短越好
public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
return (key.getYear()-1949)%numReduceTasks;
} }

MySort

package com.laoxiao.mr.weather;

import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable; public class MySort extends WritableComparator{ public MySort() {
super(MyKey.class,true);
} public int compare(WritableComparable a, WritableComparable b) {
MyKey k1=(MyKey)a;
MyKey k2=(MyKey)b;
int r1=Integer.compare(k1.getYear(), k2.getYear());
if(r1==0){
int r2=Integer.compare(k1.getMonth(), k2.getMonth());
if(r2==0){
return -Double.compare(k1.getHot(),k2.getHot());
}else{
return r2;
}
}else{
return r1;
} }
}

MyGroup

package com.laoxiao.mr.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator; public class MyGroup extends WritableComparator{ public MyGroup(){
super(MyKey.class,true);
} public int compare(WritableComparable a, WritableComparable b) {
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==0){
return Integer.compare(k1.getMonth(), k2.getMonth());
}else{
return r1;
} }
}

设置了三个reducer进程,最后的结果就放到了三个文件中。