在WordCount基础上改进,实现以词频为键值,并按词频降序排列

时间:2021-11-09 13:03:46

思路:

1、任务一:与WordCount.v1.0相同,但将处理结果以二进制形式保存到临时目录中,作为第二次MapReduce任务的输入目录
2、任务二:利用Hadoop提供的InverseMapper实现key与value位置互换,自定义一个IntWritableDecreasingComparator类,用于任务二的setSortComparatorClass( ),实现词频降序排列。

源码:


public class WordCount2 {

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

/**
* 为实现倒序排序而写
*
*/

private static class IntWritableDecreasingComparator extends IntWritable.Comparator {

@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
return -super.compare(a, b);
}

@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// TODO Auto-generated method stub
return -super.compare(b1, s1, l1, b2, s2, l2);
}

}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
// validate the number of the args
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
// 定义一个临时目录
Path tempDir = new Path("wordcount-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount2.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, tempDir);
/*
* 先将词频统计任务的输出结果写到临时目录中,下一个排序任务以临时目录为输入目录,此目录最后在HDFS中尚未出现
*
*/


job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
/*
* SequenceFileOutputFormat为常用的OutputFormat类之一,写适合后续MapReduce任务读取的二进制文件(
* 如果不进行setOutputFormatClass,那么默认OutputFormat为TextOutputFormat,写为文本行的形式)
*
*/

if (job.waitForCompletion(true))
// 此if语句表明,只有当job任务成功执行完成以后才开始sortJob,参数true表明打印verbose信息
{
Job sortJob = Job.getInstance(conf, "sort");
/*
* 在sortJob中我们并不指定Reduce类,因为不需要,Hadoop会使用默认的IdentityReducer类,
* 将中间结果原样输出
*/

sortJob.setJarByClass(WordCount2.class);
FileInputFormat.addInputPath(sortJob, tempDir);
sortJob.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class);
sortJob.setMapperClass(org.apache.hadoop.mapreduce.lib.map.InverseMapper.class);
// InverseMapper由hadoop库提供,作用是实现map()之后的数据对的key和value交换

sortJob.setNumReduceTasks(1);
// 将Reducer的个数限定为1,最终输出的结果文件就是一个

FileOutputFormat.setOutputPath(sortJob, new Path(otherArgs[1]));
sortJob.setOutputKeyClass(IntWritable.class);
sortJob.setOutputValueClass(Text.class);
sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);
/*
* Hadoop默认对IntWritable按升序排序,而我们需要的是按降序排列。
* 因此我们实现了一个IntWritableDecreasingCompatator类,并指定使用这个自定义的Comparator类,
* 对输出结果中的key(词频)进行排序
*/

System.exit(sortJob.waitForCompletion(true) ? 0 : 1);

}
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}