MR中的combiner和partitioner

时间:2023-03-09 13:23:37
MR中的combiner和partitioner

1.combiner

combiner是MR编程模型中的一个组件;

有些任务中map可能会产生大量的本地输出,combiner的作用就是在map端对输出先做一次合并,以减少map和reduce节点之间的数据传输量,提高网络IO性能,是MR的优化手段之一;

两大基本功能:

1.1map的输出的key的聚合,对map输出的key排序、value进行迭代;

1.2reduce功能。

并不是设置了combiner就一定会执行(在当前集群非常繁忙的时候设置了也不会执行);

combiner的执行时机:combiner的执行可能会在map的merge之前也可能在之后,这个参数由配置选项min.num.spill.for.combine(默认为3) 决定的,当map端产生的spill文件最少有这么3个时,combiner会在merge操作之前执行,否则之后。

一般情况下可以使用自己写的reduce类作为combiner,但是特殊情况下也可以自定义

 public static class combiner extends Reducer<Text,Text,Text,Text>{
private Text info = new Text(); //为了拆分 key值 准备存储新的value值
public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException{
int sum = 0;
for(Text val:values){
sum += Integer.parseInt(val.toString());
}
int splitIndex = key.toString().indexOf(":");
info.set(key.toString().substring(splitIndex+1)+":"+sum); //新的value值
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}
job.setCombinerClass(combiner.class);

2.partitioner

combiner可以减少map的输出到reducer所在节点的网络IO,但是map的输出被分配到哪个reducer上,是由partitioner决定的;

partitioner只有一个方法:

getPartition(Text key, Text value, int numPartitions)  

输入的是map的结果对<key,value>和reducer的数目,返回的则是分配的reducer的编号(整数)。系统缺省的partitioner是HashPartitioner,它以key的Hash值对reducer的数目取模,得到对应的reducer。这样可以保证如果有相同的key值则肯定会被分配到同一个reducer上;
和combiner一样,一般使用默认的,但是特殊情况也可以自定义

 public class NewPartition extends HashPartitioner<Text,Text>{
String keyinfo;
public int getPartition(Text key,Text value,int numReducerTasks){
keyinfo = key.toString().split(":")[0];
return super.getPartition(new Text(keyinfo), value, numReducerTasks);
}
}
job.setPartitionClass(NewPartition.class);