MapReduce实现矩阵乘法

时间:2023-01-09 08:07:22
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class MapReduceDemo extends Configured implements Tool{

    static class DemoMapper extends Mapper<LongWritable,Text,Text,Text>{
        private String filename;
        private int rownum=2;
        private int colnum=2;
        private int rowA=1;
        private int rowB=1;
        
        protected void setup(Context context)throws IOException,InterruptedException{
            InputSplit split=context.getInputSplit();
            filename=((FileSplit)split).getPath().getName();
        }
        
        protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
            String[] token=value.toString().split(",");
            if(filename.equals("A")){
                for(int i=1;i<=colnum;++i){
                       Text k=new Text(rowA+","+i);
                       for(int j=1;j<=token.length;++j){
                           Text v=new Text(filename+":"+j+","+token[j-1]);
                           context.write(k, v);
                           System.out.println(k.toString()+" "+v.toString());
                       }
                }
                ++rowA;
            }
            else if(filename.equals("B")){
                for(int i=1;i<=rownum;++i){
                    for(int j=1;j<=token.length;++j){
                        Text k=new Text(i+","+j);
                        Text v=new Text(filename+":"+rowB+","+token[j-1]);
                        context.write(k, v);
                        System.out.println(k.toString()+" "+v.toString());
                    }
                }
                ++rowB;
            }            
        }
    }
    
    static class DemoReducer extends Reducer<Text,Text,Text,IntWritable>{
        private Map<String,String> mapA=new HashMap<String,String>();
        private Map<String,String> mapB=new HashMap<String,String>();
        protected void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
            for(Text line:values){
                String filename=line.toString().substring(0,1);
                if(filename.equals("A")){
                    mapA.put(line.toString().substring(2,3), line.toString().substring(4));
                }else if(filename.equals("B")){
                    mapB.put(line.toString().substring(2, 3),line.toString().substring(4));
                }
            }
            int result=0;
            Iterator<String> it=mapA.keySet().iterator();
            while(it.hasNext()){
                String mapk=it.next();
                result+=Integer.parseInt(mapA.get(mapk))*Integer.parseInt(mapB.get(mapk));
            }
            context.write(key, new IntWritable(result));
            System.out.println();
        }
    }
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        int exitcode=ToolRunner.run(new MapReduceDemo(), args);
        System.exit(exitcode);
    }




    @Override
    public int run(String[] arg0) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf=new Configuration();
        Job job=new Job(conf);
        job.setJarByClass(getClass());
        FileInputFormat.setInputPaths(job, new Path(arg0[0]),new Path(arg0[1]));
        FileOutputFormat.setOutputPath(job, new Path(arg0[2]));
        job.setMapperClass(DemoMapper.class);
        job.setReducerClass(DemoReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        return job.waitForCompletion(true)? 1:0;
    }
}

A 矩阵2*3

B矩阵 3*2