hadoop学习笔记(九):MapReduce程序的编写

时间:2023-03-10 06:39:39
hadoop学习笔记(九):MapReduce程序的编写

一、MapReduce主要继承两个父类:

Map

 protected void map(KEY key,VALUE value,Context context) throws IOException,InterruptedException{
}

Reduce

 1 protected void reduce(KEY key,Iterable<VALUE> values,Context context) throws IOException,InterruptedException{
2 }

二、使用代码实现WordCount:

 package com.laowang.mapreduce;

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException;
import java.util.StringTokenizer; public class MR {
/**
* @author laowang
* @version v1.0.0
* @apiNote Mapper
* @since 2018/4/27 10:44
* <p>
* KEYIN, VALUEIN, KEYOUT, VALUEOUT 输入key类型,输入value类型,输出KEY类型,输出value类型
*/
static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text word = new Text(); @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取每一行的数据
String lineStr = value.toString();
//以 空格、/t、/n、/r、/f 分割
StringTokenizer stringTokenizer = new StringTokenizer(lineStr);
//遍历
while (stringTokenizer.hasMoreTokens()) {
//获取截取后的每一个字符串
String wordValue = stringTokenizer.nextToken();
//拼接到word里面去
word.set(wordValue);
//写入到输出中
context.write(word, ONE);
}
}
} /**
* @author laowang
* @version v1.0.0
* @apiNote Reducer
* @since 2018/4/27 10:44
* <p>
* KEYIN, VALUEIN, KEYOUT, VALUEOUT 输入key类型,输入value类型,输出KEY类型,输出value类型
*/
static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
result.set(sum);
context.write(key, result);
}
} /**
* @author laowang
* @version v1.0.0
* @apiNote Client
* @since 2018/4/27 10:47
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取配置信息
Configuration configuration = new Configuration();
//创建job
Job job = new Job(configuration,"wc");
//设置JOB运行的类
job.setJarByClass(MR.class);
//设置Mapper和Reducer
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//设置输入和输出路径
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//设置输出key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//提交job
boolean b = job.waitForCompletion(true);
//结束程序
System.exit(b ? 0 : 1);
}
}