mapreduce (六) MapReduce实现去重 NullWritable的使用

时间:2023-03-09 07:47:20
mapreduce (六) MapReduce实现去重 NullWritable的使用
习题来源:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html
file1
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
file2
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyDedup { public static class LineNullMapper extends Mapper<Object, Text, Text, NullWritable>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
context.write(value, NullWritable.get());
}
} public static class SortReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{
context.write(key, NullWritable.get());
}
}
如果把Iterable<NullWritable> values 替换为 NullWritable values 如果是不用Iterable迭代器的话,则不进行分组么?
结果是:只排序了并没有完成去重
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-3 c
2012-3-3 c
2012-3-4 d
2012-3-4 d
2012-3-5 a
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d

public static void main(String[] args) throws Exception {

        String dir_in = "hdfs://localhost:9000/in_dedup";
String dir_out = "hdfs://localhost:9000/out_dedup"; Path in = new Path(dir_in);
Path out = new Path(dir_out); Configuration conf = new Configuration();
Job sortJob = new Job(conf, "my_dedup"); sortJob.setJarByClass(MyDedup.class); sortJob.setInputFormatClass(TextInputFormat.class);
sortJob.setMapperClass(LineNullMapper.class);
sortJob.setCombinerClass(SortReducer.class);
//countJob.setPartitionerClass(HashPartitioner.class);
sortJob.setMapOutputKeyClass(Text.class);
sortJob.setMapOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(sortJob, in); sortJob.setReducerClass(SortReducer.class);
// countJob.setNumReduceTasks(1);
sortJob.setOutputKeyClass(Text.class);
sortJob.setOutputValueClass(NullWritable.class);
//countJob.setOutputFormatClass(SequenceFileOutputFormat.class); FileOutputFormat.setOutputPath(sortJob, out); sortJob.waitForCompletion(true); } }
运行结果:
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d