mapreduce实现多文件自定义输出

时间:2023-01-27 09:43:55

本人在项目中遇到一个问题,就是在处理日志的时候,需要有多个key,比如一行日志是 domain sip minf h b

而我处理的时候需要map输出为 key:domain+minf value h+"|"+b 和key:sip+minf value h+"|"+b,而且还要做逻辑运算,比如相同的key的value要做累加,

普通的mr通常情况下,计算结果会以part-000*输出成多个文件,并且输出的文件数量和reduce数量一样,这样就没法区分各个输出在哪个文件中,所以这样也不利于后续将mr的运行结果再做处理。

下面介绍我的处理过程,啥也不说了,上代码:

ComplexKey  是我重写的类,实现了WritableComparable接口,便于对key排序,之所以排序,是希望将相同的key放到同一个reduce中去处理。

package sina.dip.logfilter.mr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class ComplexKey implements WritableComparable<ComplexKey> {

private Text name;

private Text value;

private Text minf;

public ComplexKey() {
this.name = new Text();

this.value = new Text();

this.minf = new Text();
}

public ComplexKey(String name, String value, String minf) {
this.name = new Text(name);

this.value = new Text(value);

this.minf = new Text(minf);
}

public Text getName() {
return name;
}

public void setName(Text name) {
this.name = name;
}

public Text getValue() {
return value;
}

public void setValue(Text value) {
this.value = value;
}

public Text getMinf() {
return minf;
}

public void setMinf(Text minf) {
this.minf = minf;
}

@Override
public int compareTo(ComplexKey c) {
int compare = 0;

compare = name.compareTo(c.name);
if (compare != 0) {
return compare;
}

compare = value.compareTo(c.value);
if (compare != 0) {
return compare;
}

compare = minf.compareTo(c.minf);
if (compare != 0) {
return compare;
}

return 0;
}

@Override
public void readFields(DataInput in) throws IOException {
name.readFields(in);

value.readFields(in);

minf.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
name.write(out);

value.write(out);

minf.write(out);
}

}
分区类:

package sina.dip.logfilter.mr;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ComplexKeyPartitioner extends Partitioner<ComplexKey, Text> {

@Override
public int getPartition(ComplexKey key, Text value, int numPartitions) {
return Math.abs(key.getValue().hashCode()) % numPartitions;
}

}




这是map阶段:

package sina.dip.logfilter.mr;

import java.io.IOException;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
//map中的结果都放到了
mapoutput ,在clean的时候同意处理,将逻辑放在这边,是为了减小reduce的压力,之前累加的逻辑放入reduce,发现100G的数据,要跑大约10多分钟
而map只用了1分钟,但是放入map中后,整个处理过程不到两分钟

public class AnalysisMapper extends
Mapper<LongWritable, Text, ComplexKey, Text> {
private MultipleOutputs<ComplexKey, Text> outputs;
private Map<String,String> mapoutput = new HashMap<String,String>();
private Set<String> outputkeyset;
private String[] mapkey;
private String[] mapvalue;
private BigInteger paravalue;
protected void setup(Context context) throws IOException,
InterruptedException {
outputs = new MultipleOutputs<ComplexKey, Text>(context);
};
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
if (line == null || line.isEmpty()) {
return;
}

String[] words = line.split("\t");
//System.out.println("words.length:"+words.length);
if (words.length != 17 && words.length != 18) {
//System.out.println("line:"+value.toString());
return;
}

//if (words[0] == null || words[0].isEmpty() || words[1] == null
//|| words[1].isEmpty() || words[2] == null || words[2].isEmpty()
//|| words[14] == null || words[14].isEmpty()
//|| words[16] == null || words[16].isEmpty()) {
//return;
//}
BigInteger hit,bit;
Text hb;
//System.out.println("words.length:"+words.length);
if(words[1].equals("172.20.20.37")){
if(words.length == 17){
//System.out.println("mapoutput17:"+mapoutput.size());

hb = new Text(words[14] + "|" + words[16]);
if(null != mapoutput.get("domain"+"|"+words[2]+"|"+words[0])){//如果结果中已经存在 domain|minf
mapvalue = (mapoutput.get("domain"+"|"+words[2]+"|"+words[0])).toString().split("\\|");

hit = new BigInteger(mapvalue[0]);
bit = new BigInteger(mapvalue[1]);
hit = hit.add(new BigInteger(words[14]));
bit = bit.add(new BigInteger(words[16]));
mapoutput.put("domain"+"|"+words[2]+"|"+words[0], hit+"|"+bit);
}else{
mapoutput.put("domain"+"|"+words[2]+"|"+words[0], words[14]+"|"+words[16]);
}
if(null != mapoutput.get("sip"+"|"+words[1]+"|"+words[0])){//如果结果中已经存在 sip|minf
mapvalue = (mapoutput.get("sip"+"|"+words[1]+"|"+words[0])).toString().split("\\|");
hit = new BigInteger(mapvalue[0]);
bit = new BigInteger(mapvalue[1]);
hit = hit.add(new BigInteger(words[14]));
bit = bit.add(new BigInteger(words[16]));
mapoutput.put("sip"+"|"+words[1]+"|"+words[0], hit+"|"+bit);
}else{
mapoutput.put("sip"+"|"+words[1]+"|"+words[0], words[14]+"|"+words[16]);
}
}else if(words.length == 18){
//System.out.println("mapoutput18:"+mapoutput.size());
hb = new Text(words[15] + "|" + words[17]);
if(null != mapoutput.get("domain"+"|"+words[2]+"|"+words[0])){//如果结果中已经存在 domain|minf
mapvalue = (mapoutput.get("domain"+"|"+words[2]+"|"+words[0])).toString().split("\\|");

hit = new BigInteger(mapvalue[0]);
bit = new BigInteger(mapvalue[1]);
hit = hit.add(new BigInteger(words[15]));
bit = bit.add(new BigInteger(words[17]));
mapoutput.put("domain"+"|"+words[2]+"|"+words[0], hit+"|"+bit);
}else{
mapoutput.put("domain"+"|"+words[2]+"|"+words[0], words[15]+"|"+words[17]);
}
if(null != mapoutput.get("sip"+"|"+words[1]+"|"+words[0])){//如果结果中已经存在 sip|minf
mapvalue = (mapoutput.get("sip"+"|"+words[1]+"|"+words[0])).toString().split("\\|");
hit = new BigInteger(mapvalue[0]);
bit = new BigInteger(mapvalue[1]);
hit = hit.add(new BigInteger(words[15]));
bit = bit.add(new BigInteger(words[17]));
mapoutput.put("sip"+"|"+words[1]+"|"+words[0], hit+"|"+bit);
}else{
mapoutput.put("sip"+"|"+words[1]+"|"+words[0], words[15]+"|"+words[17]);
}
}
}

};
//多个输出,每个会有不同的key
 protected void cleanup(Context context) throws IOException,
InterruptedException {
outputkeyset = mapoutput.keySet();
for(String outputkey : outputkeyset){
mapkey = outputkey.split("\\|");
if(mapkey[0].equals("domain")){
mapvalue = mapoutput.get(outputkey).split("\\|");
//System.out.println("domainh:"+mapvalue[0]);
ComplexKey domain = new ComplexKey("domain", mapkey[1], mapkey[2]);
Text hb = new Text(mapvalue[0] + "|" + mapvalue[1]);
context.write(domain, hb);
}else if(mapkey[0].equals("sip")){
mapvalue = mapoutput.get(outputkey).split("\\|");
ComplexKey sip = new ComplexKey("sip", mapkey[1], mapkey[2]);
Text hb = new Text(mapvalue[0] + "|" + mapvalue[1]);
//System.out.println("siph:"+mapvalue[0]);
context.write(sip, hb);
}
//else if(mapkey[0].equals("httpcode")){
//ComplexKey sip = new ComplexKey("httpcode", mapkey[1], mapkey[2]);
//Text h = new Text(mapoutput.get(outputkey));
//context.write(sip, h);
//}
}
outputs.close();
};
}


reduce:

package sina.dip.logfilter.mr;

import java.io.IOException;
import java.math.BigInteger;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
/**
* 根据不同的key处理不同的逻辑,然后输出到相应的目录下
*
*/
public class AnalysisReducerBack1 extends Reducer<ComplexKey, Text, Text, Text> {

private MultipleOutputs<Text, Text> outputs;

protected void setup(Context context) throws IOException,
InterruptedException {
outputs = new MultipleOutputs<Text, Text>(context);
};
//根据不同的key,处理不同的逻辑,并输出到不同的目录下
protected void reduce(ComplexKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Text oKey = null,oValue = null;
BigInteger h = new BigInteger("0"),b = new BigInteger("0");
if(key.getName().toString().equals("sip") || key.getName().toString().equals("domain")){


for (Text value : values) {
String[] words = value.toString().split("\\|");
h = h.add(new BigInteger(words[0]));
b = b.add(new BigInteger(words[1]));
//h += Integer.valueOf(words[0]);
//b += Integer.valueOf(words[1]);
}

oKey = new Text(key.getValue() + "\t" + key.getMinf());
oValue = new Text(h + "\t" + b);
}else if(key.getName().toString().equals("httpcode")){
for (Text value : values) {
h = h.add(new BigInteger(value.toString()));
//h += Integer.valueOf(value.toString());
}

oKey = new Text(key.getValue() + "\t" + key.getMinf());
oValue = new Text(String.valueOf(h));
}


outputs.write(oKey, oValue, key.getName().toString()+"/"+key.getName().toString());
//根据key输出,比如domain的key,则输出到了outputpath/domain/domain-part-000x;
//或者设置为outputs.write(oKey, oValue, key.getName().toString());则输出为outputpath/domain-part-000x;
 };

protected void cleanup(Context context) throws IOException,
InterruptedException {
outputs.close();
};

}

     最后就是在job调用时设置了

package sina.dip.logfilter.mr;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import sina.dip.logfilter.DipFilterLogData;
import sina.dip.logfilter.config.LogConfig;
import sina.dip.logfilter.config.ServerConfig;
import sina.dip.logfilter.util.FileUtil;



public class AnalysisLoader {

/**
* @param args
* @param conf
* @return
* @throws Exception
*/
public boolean run(Configuration conf, String inputPath, String outPath,String category)
throws Exception {
Job job = new Job(conf, "DIP_DIPLOGFILTER-"+category);
DistributedCache.addFileToClassPath(new Path("/libs/hbase-0.92.1-cdh4.0.0-security.jar"), job.getConfiguration());
//解决第三包的调用问题,在其他的文章中有介绍
 job.setJarByClass(AnalysisLoader.class);


job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setMapperClass(AnalysisMapper.class);
job.setMapOutputKeyClass(ComplexKey.class);
job.setMapOutputValueClass(Text.class);

job.setPartitionerClass(ComplexKeyPartitioner.class);
//job.setCombinerClass(AnalysisReducer.class);
job.setReducerClass(AnalysisReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(LogConfig.reduceCount);
String hdfs = ServerConfig.getHDFS();

String[] inputPaths =inputPath.split(",");
for (String p : inputPaths) {
if (!p.startsWith(hdfs)) {
p = hdfs + p;
}
MultipleInputs.addInputPath(job, new Path(p),TextInputFormat.class, AnalysisMapper.class);
}

FileOutputFormat.setOutputPath(job, new Path(outPath));

return(job.waitForCompletion(true));


}
}