hadoop-1.x的运行实例

时间:2023-03-08 20:32:19

我的环境是hadoop-0.20.2,eclipse:SDK-3.3.2,

源数据为:

Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84
Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84
Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84
Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84
Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84
Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84

想要获取的数据是:

Apr 23 14:7d:c5:9e:fb:84
Apr 23 14:7d:c5:9e:fb:84
Apr 23 14:7d:c5:9e:fb:84
Apr 23 14:7d:c5:9e:fb:84
Apr 23 14:7d:c5:9e:fb:84
Apr 23 14:7d:c5:9e:fb:84

运行时输入的参数是:
hdfs的输入和输出目录:即 hdfs://cMaster:/user/joe/in    hdfs://cMaster:/user/joe/out

源代码:

package hadoop;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.*;
public class test extends Configured implements Tool{
enum Counter{
LINESKIP,
}
public static class Map extends Mapper<LongWritable,Text,NullWritable,Text>{
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String line=value.toString();
try{
String [] lineSplit=line.split(" ");
String month=lineSplit[0];
String time=lineSplit[1];
String mac=lineSplit[6];
Text out=new Text(month+' '+time+' '+mac);
context.write(NullWritable.get(),out);
}catch(java.lang.ArrayIndexOutOfBoundsException e){
context.getCounter(Counter.LINESKIP).increment(1);
return;
}
}
}
public int run(String[] args)throws Exception{
Configuration conf=getConf();
Job job=new Job(conf,"test");
job.setJarByClass(test.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(Map.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
public static void main(String[] args)throws Exception{
int res=ToolRunner.run(new Configuration(),new test(),args);
System.exit(res);
}
}