MapReduce统计TopN示例

时间:2022-04-06 18:24:21

分别统计年、月、日最高气温(实现排序)

package mr.temp;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class TopTemp {
    static class MyMapper extends Mapper<LongWritableTextTextDoubleWritable> {
        Text outKey new Text();
        DoubleWritable outValue new DoubleWritable();

        @Override
        protected void map(LongWritable keyText valueContext context) throws IOExceptionInterruptedException {
            try {
                String[] datas = value.toString().split("\t");
                outValue.set(Double.valueOf(datas[1]));
                outKey.set(datas[0].substring(010));//按照天进行统计最高气温
                context.write(outKeyoutValue);
                outKey.set(datas[0].substring(07));//按照月进行统计最高气温
                context.write(outKeyoutValue);
                outKey.set(datas[0].substring(04));//按照年进行统计最高气温
                context.write(outKeyoutValue);
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    static class MyReducer extends Reducer<TextDoubleWritableTextDoubleWritable> {
        @Override
        protected void reduce(Text keyIterable<DoubleWritable> valuesContext context) throws IOExceptionInterruptedException {
            double max = 0;
            for (DoubleWritable value : values) {
                max = Math.max(maxvalue.get());
            }
            context.write(key, new DoubleWritable(max));
        }
    }

    public static void run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("mapred.jar""D:/IDEAWorks/hadoopAPI/target/hadoopAPI-1.0-SNAPSHOT.jar");
        String[] otherArgs = new GenericOptionsParser(confargs).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: TopTemp <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf"TopTemp");
        job.setJarByClass(TopTemp.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean isSuccess = job.waitForCompletion(true);
        System.exit(isSuccess ? 1);
    }

    public static void main(String[] args) throws Exception {
        args = new String[]{"file:///D:/data/weather""hdfs://n2:9820/remoteUser/r3"};
        run(args);
    }
}

默认Text是按照字符串中Ascii码排序的,因为有补0,所以排序和预期一样,不需要额外修改排序。

--key排序思想:能够用字符串实现排序的,就不要用自定义封装进行排序(除非是特殊排序规则,比如字符串中前部分升序,后部分降序)了,感觉效率低,代码还多。因为Text好像可以不用反序列化比较。上面的是求最大值的需求,Text就可以搞定key的排序、分组,推广到TopN问题上,也就是修改reducer代码,通过TreeSetTopN字段进行统计即可。这类需求均可以添加combiner进行提速,封装、groupsort都不用自己写。

static class MyReducer extends Reducer<TextDoubleWritableTextDoubleWritable> {
    private final static int TopN 3;
    @Override
    protected void reduce(Text keyIterable<DoubleWritable> valuesContext context) throws IOExceptionInterruptedException {
        TreeSet<Double> treeSet = new TreeSet<>();
        for (DoubleWritable value : values) {
            treeSet.add(value.get());
        }
        for (int i = 0i < TopN && !treeSet.isEmpty()i++) {
            context.write(key, new DoubleWritable(treeSet.pollLast()));
        }
    }
}

注意:可能数据有限,不够的TopN的情况,需要根据treeSet的实际情况判断,并且这里统计的TopN是分组中的,所以TreeSet需要在reduce中初始化。

另外注意:TopN的统计中容易将并列的项目进行随机替换,需要自定义比较进行避免,具体如下:

自定义比较器解决并列TopN问题

public class MyKey {
    private String date;
    private double temp;

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public double getTemp() {
        return temp;
    }

    public void setTemp(double temp) {
        this.temp = temp;
    }
}

封装类,用来便于比较

static class MyReducer extends Reducer<TextDoubleWritableTextDoubleWritable> {
    private final static int TopN 3;
    TreeSet<MyKey> topSet new TreeSet<>(new Comparator<MyKey>() {
        @Override
        public int compare(MyKey o1MyKey o2) {
            int r1 = (int) (o1.getTemp() - o2.getTemp());
            if (r1 == 0)
                return o1.getDate().compareTo(o2.getDate());
            return r1;
        }
    });

    @Override
    protected void reduce(Text keyIterable<DoubleWritable> valuesContext context) throws IOExceptionInterruptedException {
        for (DoubleWritable value : values) {
            MyKey myKey = new MyKey();//TreeSet中add的元素不能重用,一定要new
            myKey.setDate(key.toString());
            myKey.setTemp(value.get());
            topSet.add(myKey);
            if (topSet.size() > TopN)
                topSet.pollFirst();
        }
    }

    @Override
    protected void cleanup(Context context) throws IOExceptionInterruptedException {
        while (!topSet.isEmpty()) {
            MyKey myKey = topSet.pollLast();
            context.write(new Text(myKey.getDate()), new DoubleWritable(myKey.getTemp()));
        }
    }
}

按照日期分组(需要修改Mapper),统计最高温度的前3,包含并列项目,通过TreeSet排序并保存TopN的数据,在reduce之后,由cleanup进行汇总,支持combiner