在hadoop上进行编写mapreduce程序,统计关键词在text出现次数

时间:2023-03-10 02:49:30
在hadoop上进行编写mapreduce程序,统计关键词在text出现次数

mapreduce的处理过程分为2个阶段,map阶段,和reduce阶段。在要求统计指定文件里的全部单词的出现次数时。

map阶段把每一个关键词写到一行上以逗号进行分隔。并初始化数量为1(同样的单词hadoop中的map会自己主动放到一行中)

reduce阶段是把每一个单词出现的频率统计出来又一次写回去。

如代码:

package com.clq.hadoop2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
final Text key2 = new Text();
// value2 表示单词在该行中的出现次数
final IntWritable value2 = new IntWritable(1);
// key 表示文本行的起始位置
// value 表示文本行
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
final String[] splited = value.toString().split(",");
for (String word : splited) {
key2.set(word);
// 把key2、value2写入到context中
context.write(key2, value2);
}
}
}
package com.clq.hadoop2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
// value3表示单词出现的总次数
final IntWritable value3 = new IntWritable(0);
/**
* key 表示单词 values 表示map方法输出的1的集合 context 上下文对象
*/
protected void reduce(Text key, java.lang.Iterable<IntWritable> values,
Context context) throws java.io.IOException, InterruptedException {
int sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 运行到这里,sum表示该单词出现的总次数
// key3表示单词,是最后输出的key
final Text key3 = key;
// value3表示单词出现的总次数,是最后输出的value
value3.set(sum);
context.write(key3, value3);
}
}
package com.clq.hadoop2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MapperReducer { public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
//指定输入和输出路径
final String INPUT_PATH = "hdfs://ubuntu:9000/Input";
final String OUTPUT_PATH = "hdfs://ubuntu:9000/output";
//创建一个job对象封装执行时所须要的信息
final Job job = new Job(new Configuration(),"MapperReducer");
//打成jar执行
job.setJarByClass(MapperReducer.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
//指定自己自定义的mapper类
job.setMapperClass(MyMapper.class);
//指定执行mapper类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定自定义的reducer类
job.setReducerClass(MyReducer.class);
//指定reducer的key和value类型
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(true); }
}