简单 实现CombineFileInputFormat

时间:2024-01-01 14:20:51
import java.io.DataOutput;
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class TestCombine extends Configured implements Tool {
    private static class ProvinceMapper extends
            Mapper<Object, Text, Text, Text> {
        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            System.out.println("value : " + value + " Context " + context);
            context.write(value, value);
        }
    }
 
    private static class ProvinceReducer extends
            Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text va : values) {
                System.out.println("reduce " + key);
                context.write(key, key);
            }
        }
    }
     
    public static class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> { 
        @SuppressWarnings({ "unchecked", "rawtypes" }) 
        @Override 
        public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { 
            return new CombineFileRecordReader((CombineFileSplit)split, context, CombineLineRecordReader.class); 
        } 
    } 
     
    public static class CombineLineRecordReader<K, V> extends RecordReader<K, V> { 
        private CombineFileSplit split; 
        private TaskAttemptContext context; 
        private int index; 
        private RecordReader<K, V> rr; 
       
        @SuppressWarnings("unchecked") 
        public CombineLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException { 
            this.index = index;
            this.split = (CombineFileSplit) split; 
            this.context = context; 
       
            this.rr = (RecordReader<K, V>) ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration()); 
        } 
       
        @SuppressWarnings("unchecked") 
        @Override 
        public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException { 
            this.split = (CombineFileSplit) curSplit; 
            this.context = curContext; 
       
            if (null == rr) { 
                rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration()); 
            } 
       
            FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
                    this.split.getOffset(index), this.split.getLength(index), 
                    this.split.getLocations()); 
               
            this.rr.initialize(fileSplit, this.context); 
        } 
       
        @Override 
        public float getProgress() throws IOException, InterruptedException { 
            return rr.getProgress(); 
        } 
       
        @Override 
        public void close() throws IOException { 
            if (null != rr) { 
                rr.close(); 
                rr = null; 
            } 
        } 
       
        @Override 
        public K getCurrentKey() 
        throws IOException, InterruptedException { 
            return rr.getCurrentKey(); 
        } 
       
        @Override 
        public V getCurrentValue() 
        throws IOException, InterruptedException { 
            return rr.getCurrentValue(); 
        } 
       
        @Override 
        public boolean nextKeyValue() throws IOException, InterruptedException { 
            return rr.nextKeyValue(); 
        } 
    } 
 
     
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
         
        Job job = new Job(conf);
        job.setJobName("TestCombine");
        job.setJarByClass(TestCombine.class);
 
        job.setMapperClass(ProvinceMapper.class);
        job.setReducerClass(ProvinceReducer.class);
         
        job.setInputFormatClass(CombineSequenceFileInputFormat.class);
         
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
         
        String inpath = "/home/hadoop/tmp/combine";
        String outpath = "/home/hadoop/tmp/combineout";
        Path p = new Path(outpath);
         
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(p)){
            fs.delete(p);
        }
        FileInputFormat.addInputPaths(job, inpath);
        FileOutputFormat.setOutputPath(job, p);
 
        return job.waitForCompletion(true) ? 0 : 1;
    }
 
    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new TestCombine(), args);
        System.exit(ret);
    }
}