Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(九)

时间:2021-12-21 05:08:27

  下面,是版本1。

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一)

  这篇博文,包括了,实际生产开发非常重要的,单元测试和调试代码。这里不多赘述,直接送上代码。

MRUnit 框架

MRUnitCloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用。MRUnit针对不同测试对象使用不同的Driver:

MapDriver:针对单独的Map测试

 ReduceDriver:针对单独的Reduce测试

   MapReduceDriver:将map和reduce串起来测试

PipelineMapReduceDriver:将多个MapReduce对串志来测试

  记得,将这个jar包,放到工程项目里。我这里是在工程项目的根目录下的lib下。

代码版本2

编写TemperatureMapperTest.java的代码。  编译,出现以下,则说明无误。

在test()方法中,withInput的key/value参数分别为偏移量和一行气象数据,其类型要与TemperatureMapper的输入类型一致即为LongWritable和Text。 withOutput的key/value参数分别是我们期望输出的new Text("03103")和new IntWritable(200),我们要达到的测试效果就是我们的期望输出结果与 TemperatureMapper 的实际输出结果一致。

测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 Mapper 测试成功了。

创建TemperatureReduceTest.java,来对Reduce进行测试。

在test()方法中,withInput的key/value参数分别为new Text(key)和List类型的集合values。withOutput 的key/value参数分别是我们所期望输出的new Text(key)和new IntWritable(150),我们要达到的测试效果就是我们的期望输出结果与TemperatureReducer实际输出结果一致。

编写TemperatureReduceTest.java的代码。  编译,出现以下,则说明无误。

Reducer 端的单元测试,鼠标放在 TemperatureReduceTest 类上右击,选择 Run As ——> JUnit test,运行结果如下所示。

测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 Reducer 测试成功了。

MapReduce 单元测试

把 Mapper 和 Reducer 集成起来的测试案例代码如下。

创建TemperatureTest.java,来进行测试。

在 test() 方法中,withInput添加了两行测试数据line和line2,withOutput 的key/value参数分别为我们期望的输出结果new Text("03103")和new IntWritable(150)。我们要达到的测试效果就是我们期望的输出结果与Temperature实际的输出结果一致。

编写TemperatureTest.java的代码。 编译,出现以下,则说明无误。

Reducer 端的单元测试,鼠标放在 TemperatureTest.java类上右击,选择 Run As ——> JUnit test,运行结果如下所示。

测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 MapReduce 测试成功了。

package zhouls.bigdata.myMapReduce.TemperatureTest;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 统计美国每个气象站30年来的平均气温
* 1、编写map()函数
* 2、编写reduce()函数
* 3、编写run()执行方法,负责运行MapReduce作业
* 4、在main()方法中运行程序
*
* @author zhouls
*
*/
//继承Configured类,实现Tool接口
public class Temperature extends Configured implements Tool
{
public static class TemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{ //输入的key,输入的value,输出的key,输出的value
/**
* @function Mapper 解析气象站数据
* @input key=偏移量 value=气象站数据
* @output key=weatherStationId value=temperature
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{ //map()函数还提供了context实例,用于键值对的输出
//第一步,我们将每行气象站数据转换为每行的String类型
String line = value.toString(); //每行气象数据

//第二步:提取气温值
int temperature = Integer.parseInt(line.substring(14, 19).trim());//每小时气温值
//需要转换为整形,截取第14位到19位,把中间的空格去掉。
if (temperature != -9999) //过滤无效数据
{
//第三步:提取气象站编号
//获取输入分片
FileSplit fileSplit = (FileSplit) context.getInputSplit();//提取输入分片,并转换类型
//然后通过文件名称提取气象站编号
String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通过文件名称提取气象站id
//首先通过文件分片fileSplit来获取文件路径,然后再获取文件名字,然后截取第5位到第10位就可以得到气象站 编号
context.write(new Text(weatherStationId), new IntWritable(temperature));
//气象站编号,气温值
}
}
}