http://www.cnblogs.com/zhengrunjian/p/4992043.html
- 基于hadoop的专利数据处理示例
- MapReduce程序框架
- 用于计数统计的MapReduce基础程序
- 支持用脚本语言编写MapReduce程序的hadoop流式API
- 用于提升性能的Combiner
代码清单 典型hadoop程序模版
1 import java.io.IOException;
2 import java.util.Iterator;
3
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.conf.Configured;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.Text;
8 import org.apache.hadoop.mapred.FileInputFormat;
9 import org.apache.hadoop.mapred.FileOutputFormat;
10 import org.apache.hadoop.mapred.JobClient;
11 import org.apache.hadoop.mapred.JobConf;
12 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
13 import org.apache.hadoop.mapred.MapReduceBase;
14 import org.apache.hadoop.mapred.Mapper;
15 import org.apache.hadoop.mapred.OutputCollector;
16 import org.apache.hadoop.mapred.Reducer;
17 import org.apache.hadoop.mapred.Reporter;
18 import org.apache.hadoop.mapred.TextOutputFormat;
19 import org.apache.hadoop.util.Tool;
20 import org.apache.hadoop.util.ToolRunner;
21
22 public class MyJob extends Configured implements Tool {
23
24 public static class MapClass extends MapReduceBase
25 implements Mapper<Text, Text, Text, Text> {
26
27 public void map(Text key, Text value,
28 OutputCollector<Text, Text> output,
29 Reporter reporter) throws IOException {
30
31 output.collect(value, key);
32 }
33 }
34
35 public static class Reduce extends MapReduceBase
36 implements Reducer<Text, Text, Text, Text> {
37
38 public void reduce(Text key, Iterator<Text> values,
39 OutputCollector<Text, Text> output,
40 Reporter reporter) throws IOException {
41
42 String csv = "";
43 while (values.hasNext()) {
44 if (csv.length() > 0) csv += ",";
45 csv += values.next().toString();
46 }
47 output.collect(key, new Text(csv));
48 }
49 }
50
51 public int run(String[] args) throws Exception {
52 Configuration conf = getConf();
53
54 JobConf job = new JobConf(conf, MyJob.class);
55
56 Path in = new Path(args[0]);
57 Path out = new Path(args[1]);
58 FileInputFormat.setInputPaths(job, in);
59 FileOutputFormat.setOutputPath(job, out);
60
61 job.setJobName("MyJob");
62 job.setMapperClass(MapClass.class);
63 job.setReducerClass(Reduce.class);
64
65 job.setInputFormat(KeyValueTextInputFormat.class);
66 job.setOutputFormat(TextOutputFormat.class);
67 job.setOutputKeyClass(Text.class);
68 job.setOutputValueClass(Text.class);
69 job.set("key.value.separator.in.input.line", ",");
70
71 JobClient.runJob(job);
72
73 return 0;
74 }
75
76 public static void main(String[] args) throws Exception {
77 int res = ToolRunner.run(new Configuration(), new MyJob(), args);
78
79 System.exit(res);
80 }
81 }
框架的核心在run()方法中,也称为driver。它实例化、配置并传递一个JobConf对象命名的作业给JobClient.runJob()以启动MapReduce作业。JobConf对象将保持作业运行所需的全部配置参数。Driver需要在作业中为每个作业定制基本参数,包括输入路径、输出路径、Mapper类和Reducer类。另外,每个作业可以重置默认的作业属性,例如InputFormat、OutputFormat等,也可以调用JobConf对象中的set()方法填充人意配置参数。一旦传递JobConf对象到JobClient.runJob(),它就被视为作业的总体规划,成为决定这个作业如何运作的蓝本。 JobConf对象有许多参数,但我们并不希望全部的参数都通过编写driver来设置,可以把Hadoop安装时的配置文件作为一个很好的起点。同时,用户可能希望在命令行启动一个作业时传递额外的参数来改变作业配置。Driver可以通过自定义一组命令并自行处理用户参数,来支持用户修改其中的一些配置。因为经常需要做这样的任务,hadoop框架便提供了ToolRunner、Tool和Configured来简化其实现。 通过使用ToolRunner,MyJob可以自动支持下表中的选项:
选项 | 描述 |
-conf <configuration file> | 指定一个配置文件 |
-D <property=value> | 给JobConf属性赋值 |
-fs <local | namenode:port> | 指定一个NameNode,可以是“local” |
-jt <local | jobtracker:port> | 指定一个JobTracker |
-files <list of files> | 指定一个以逗号分隔的文件列表,用于MapReduce作业。这些文件自动地分布到所有节点,使之可从本地获取 |
-libjars <list of jars> | 指定一个以逗号分隔的jar文件列表,使之包含在所有任务JVM的classpath中 |
-archives <list of archives> | 指定一个以逗号分隔的存档文件列表,使之可以在所有任务节点上打开 |
- 编写MapReduce程序的第一步是了解数据流;
- 基于对数据流的理解,可以为输入、中间结果、输出的键/值对k1、v1、k2、v2、k3和v3设定类型;
- 根据数据流河数据类型,很容易能够理解程序代码。
代码清单 CitationHistogram.java
1 import java.io.IOException;
2 import java.util.Iterator;
3
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.conf.Configured;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.IntWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapred.FileInputFormat;
10 import org.apache.hadoop.mapred.FileOutputFormat;
11 import org.apache.hadoop.mapred.JobClient;
12 import org.apache.hadoop.mapred.JobConf;
13 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
14 import org.apache.hadoop.mapred.MapReduceBase;
15 import org.apache.hadoop.mapred.Mapper;
16 import org.apache.hadoop.mapred.OutputCollector;
17 import org.apache.hadoop.mapred.Reducer;
18 import org.apache.hadoop.mapred.Reporter;
19 import org.apache.hadoop.mapred.TextOutputFormat;
20 import org.apache.hadoop.util.Tool;
21 import org.apache.hadoop.util.ToolRunner;
22
23 public class CitationHistogram extends Configured implements Tool {
24
25 public static class MapClass extends MapReduceBase
26 implements Mapper<Text, Text, IntWritable, IntWritable> {
27
28 private final static IntWritable uno = new IntWritable(1);
29 private IntWritable citationCount = new IntWritable();
30
31 public void map(Text key, Text value,
32 OutputCollector<IntWritable, IntWritable> output,
33 Reporter reporter) throws IOException {
34
35 citationCount.set(Integer.parseInt(value.toString()));
36 output.collect(citationCount, uno);
37 }
38 }
39
40 public static class Reduce extends MapReduceBase
41 implements Reducer<IntWritable,IntWritable,IntWritable,IntWritable>
42 {
43
44 public void reduce(IntWritable key, Iterator<IntWritable> values,
45 OutputCollector<IntWritable, IntWritable>output,
46 Reporter reporter) throws IOException {
47
48 int count = 0;
49 while (values.hasNext()) {
50 count += values.next().get();
51 }
52 output.collect(key, new IntWritable(count));
53 }
54 }
55
56 public int run(String[] args) throws Exception {
57 Configuration conf = getConf();
58
59 JobConf job = new JobConf(conf, CitationHistogram.class);
60
61 Path in = new Path(args[0]);
62 Path out = new Path(args[1]);
63 FileInputFormat.setInputPaths(job, in);
64 FileOutputFormat.setOutputPath(job, out);
65
66 job.setJobName("CitationHistogram");
67 job.setMapperClass(MapClass.class);
68 job.setReducerClass(Reduce.class);
69
70 job.setInputFormat(KeyValueTextInputFormat.class);
71 job.setOutputFormat(TextOutputFormat.class);
72 job.setOutputKeyClass(IntWritable.class);
73 job.setOutputValueClass(IntWritable.class);
74
75 JobClient.runJob(job);
76
77 return 0;
78 }
79
80 public static void main(String[] args) throws Exception {
81 int res = ToolRunner.run(new Configuration(),
82 new CitationHistogram(),
83 args);
84
85 System.exit(res);
86 }
87 }
4、适应Hadoop API的改变 (1)首先值得注意的是,在新的API中org.apache.hadoop.mapred的许多类都被移走了。多数被放入org.apache.hadoop.mapreduce,而且类库都放在org.apache.hadoop.mapreduce.lib的一个包中。当转为使用新API时,org.apache.hadoop.mapred下所有类的import声明就不存在了,它们都被弃用。 (2)新API中最有益的变化是引入了上下文对象context。最直接的影响在于替换了map()和reduce()方法中使用的OutputCollector和Reporter对象。深远的影响是统一了应用代码和MapReduce框架之间的通信,并固定了Mapper和Reduce的API,使得添加新功能时不会改变基本方法签名。 (3)新的map()和reduce()方法分别被包含在新的抽象类Mapper和Reducer中。它们取代了原始API中的Mapper和Reducer接口。新的抽象类也替换了MapReduceBase类,使之被弃用。 (4)新的map()和Reduce()方法多了一两处细微的改变。它们可以抛出InterruptedException而非单一的IOException。而且,reduce()方法不再以Iterator而以Iterable来接受一个值的列表,这样更容易使用Java的foreach语义来实现迭代。 原始API中的签名 public static class MapClass extends MapReduceBase implements Mapper<k1, v1, k2, v2> { public void map(k1 key, v1 value, OutputCollector<k2, v2> output, Reporter reporter) throws IOException { }} public static class Reducer extends MapReduceBase implements Mapper<k2, v2, k3, v3> { public void map(k2 key, Iterator<v2> values, OutputCollector<k3, v3> output, Reporter reporter) throws IOException { }} 新API一定程度上对它们做了简化 public static class MapClass extends Mapper<k1, v1, k2, v2> { public void map(k1 key, v1 value, Context context) throws IOException, InterruptedException { }} public static class Reduce extends Reducer<k2, v2, k3, v3> { public void map(k2 key, Iterable<v2> value, Context context) throws IOException, InterruptedException { }} (5)还需要改变driver中的一些内容来支持新的API。在新的API中JobConf和JobClient被替换了。它们的功能已经被放入Configuration类和一个新的类Job中。Configuration类纯粹为了配置作业而设,而Job类负责定义和控制一个作业的执行。作业的构造和提交执行现在放在Job中。 原API JobConf job = new JobConf(conf, MyJob.calss);job.setJobName(“MyJob"); JobClient.runJob(job); 新API Job job = new Job(conf, “MyJob”);job.setJarByClass(MyJob.class); System.exit(job.waitForCompletion(true)?0:1);
代码清单 基于版本0.20新API重写的hadoop基础程序模版
1 import java.io.IOException;
2 import java.util.Iterator;
3
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.conf.Configured;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.Text;
8 import org.apache.hadoop.io.LongWritable;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
16 import org.apache.hadoop.util.Tool;
17 import org.apache.hadoop.util.ToolRunner;
18
19 public class MyJob extends Configured implements Tool {
20
21 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
22
23 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
24
25 String[] citation = value.toString().split(",");
26 context.write(new Text(citation[1]), new Text(citation[0]));
27 }
28 }
29
30 public static class Reduce extends Reducer<Text, Text, Text, Text> {
31
32 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
33
34 String csv = "";
35 for (Text val:values) { //Iterable类型允许foreach循环
36 if (csv.length() > 0) csv += ",";
37 csv += val.toString();
38 }
39
40 context.write(key, new Text(csv));
41 }
42 }
43
44 public int run(String[] args) throws Exception {
45 Configuration conf = getConf();
46
47 Job job = new Job(conf, "MyJob");
48 job.setJarByClass(MyJob.class);
49
50 Path in = new Path(args[0]);
51 Path out = new Path(args[1]);
52 FileInputFormat.setInputPaths(job, in);
53 FileOutputFormat.setOutputPath(job, out);
54
55 job.setMapperClass(MapClass.class);
56 job.setReducerClass(Reduce.class);
57
58 job.setInputFormatClass(TextInputFormat.class); //兼容的InputFormat类
59 job.setOutputFormatClass(TextOutputFormat.class);
60 job.setOutputKeyClass(Text.class);
61 job.setOutputValueClass(Text.class);
62
63 System.exit(job.waitForCompletion(true)?0:1);
64
65 return 0;
66 }
67
68 public static void main(String[] args) throws Exception {
69 int res = ToolRunner.run(new Configuration(), new MyJob(), args);
70
71 System.exit(res);
72 }
73 }
5、Hadoop的Streaming
- 通过Unix命令使用Streaming
- 通过脚本使用Streaming
- 用Streaming处理键/值对
- 通过Aggregate包使用Streaming