package com.bw.day10;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
* @author Administrator
* WordCount
* 2017-8-12 09:23
*
*/
public class Day10 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://192.168.0.117:9000");
config.set("yarn.resourcemanager.hostname", "192.168.0.117");
Job job = Job.getInstance(config);
//MR
job.setMapperClass(mapper.class);
job.setReducerClass(reducer.class);
//T/L
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//T/L
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path("/day10.txt"));
FileOutputFormat.setOutputPath(job, new Path("/Out"));
//BOOLEAN
boolean b = job.waitForCompletion(true);
if(b){
System.out.println("Success");
}else{
System.out.println("Error");
}
}
public static class mapper extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
@SuppressWarnings("unused")
IntWritable one = new IntWritable(1);
Text word = new Text();
String pattern = "[^a-zA-Z0-9-']";
String line = value.toString();
line = line.replaceAll(pattern, " ");
StringTokenizer itr = new StringTokenizer(line);
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word, new LongWritable(1));
}
}
}
public static class reducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text text, Iterable<LongWritable> iterable, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (LongWritable longWritable : iterable) {
sum += longWritable.get();
}
context.write(text,new LongWritable(sum));
}
}
}