MapReduce程序调试工具--MRUnit简介与使用

时间:2021-11-04 02:06:09

MRUnit简介:

        当hadoop的MapReduce作业提交到集群环境中运行,对于出问题的定位比较是比较麻烦的,有时需要一遍遍的修改代码和打印出日志来排查一个很小的问题,如果数据量大的话调试起来相当耗时间。因此有必要使用良好的单元测试手段来尽早的消除明显的bug。然而做MapReduce的单元测试会有一个障碍,比如Map和Reduce一些参数对象是在运行时由hadoop框架传入的,例如OutputCollector、Reporter、InputSplit等。这就需要有其他手段去完成。MRUnit是专门为Hadoop MapReduce写的单元测试框架,API简单明了,简单实用。但也有一些薄弱的地方,比如不支持MultibleOutputs(很多情况下我们会用MultipleOutputs作为多文件输出,后面将介绍如何加强MRUnit使之支持MultipleOutputs)。

MRUnit安装:

       对于在已有Hadoop工程项目中使用MUnit需要遵循如下步骤:

(1)首先下载MRUnit,网址为http://mrunit.apache.org/,下载最新的MRUnit。本人使用的hadoop版本为hadoop 1.0.4 下载的文件为 apache-mrunit-1.0.0-hadoop1-bin.tar.gz

(2)解压缩下载的文件,得到hamcrest-core-1.1.jar  junit-4.10.jar  mockito-all-1.8.5.jar  mrunit-1.0.0-hadoop1.jar

(3)将这四个文件加入到项目的Path中。在eclipse中,选中项目-->右键build path-->configure build path-->add external jars。

MRUnit实例:

       我们知道,在进行一般性的JUnit测试时,根据不同的测试对象要采用不同的测试模块来进行,MRUnit针对不同测试对象分别使用一下几种Driver:  MapDriver ,针对单独的Map测试 ReduceDriver,针对单独的Reduce测试。 MapReduceDriver ,将Map和Reduce连贯起来测试。 PipelineMapReduceDriver,将多个Map-Reduce pair贯穿测试。 下面我们首先来看使用MRUnit对自定义的Mapper进行测试的方法。

下面使用经典入门程序worldcount举例,体验下MRUnit的效果。

Map程序:

package com.hadoop;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TxtMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
     protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
    String []strs=value.toString().split(" ");
    for(String str:strs){
    context.write(new Text(str), new IntWritable(1));
    }
     };
}

Reduce程序:

package com.hadoop;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TxtReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws java.io.IOException ,InterruptedException {
    int sum=0;
    Iterator<IntWritable>it=values.iterator();
    while(it.hasNext()){
    IntWritable value=it.next();
    sum+=value.get();
    }
    context.write(key, new IntWritable(sum));
    };
}

测试程序:

package com.hadoop;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;


public class MapTest{
private Mapper Map;
private MapDriver driver;
@Before
public void init(){
Map=new TxtMapper();
driver=new MapDriver(Map);
}
@SuppressWarnings("unchecked")
@Test
public void testMap()throws Exception{
String text="hello world goodbye world hello hadoop goodbye hadoop";
driver.withInput(new LongWritable(), new Text(text))
.withOutput(new Text("hello"),new IntWritable(1))
.withOutput(new Text("world"),new IntWritable(1))
.withOutput(new Text("goodbye"),new IntWritable(1))
.withOutput(new Text("world"),new IntWritable(1))
.withOutput(new Text("hello"),new IntWritable(1))
.withOutput(new Text("hadoop"),new IntWritable(1))
.withOutput(new Text("goodbye"),new IntWritable(1))
.withOutput(new Text("hadoop"),new IntWritable(2)).runTest();
}

}

选中方法 run as junit test,结果进度条为绿色,证明junit测试正确。


MapReduce程序调试工具--MRUnit简介与使用

如果将.最后一行写为 withOutput(new Text("hadoop"),new IntWritable(2)).runTest(),则出现下面的错误结果:

13/09/26 15:58:16 ERROR mrunit.TestDriver: Received unexpected output (hadoop, 1) at position 7.
13/09/26 15:58:16 ERROR mrunit.TestDriver: Missing expected output (hadoop, 2) at position 7.

可见MRUnit已经生效。

MapReduce程序调试工具--MRUnit简介与使用

也可以参考MRUnit官网上的示例:

Following is an example to use MRUnit to unit test a Map Reduce program that does SMS CDR (call details record) analysis.

The records look like

  1. CDRID;CDRType;Phone1;Phone2;SMS Status Code
    655209;1;796764372490213;804422938115889;6
    353415;0;356857119806206;287572231184798;4
    835699;1;252280313968413;889717902341635;0

The MapReduce program analyzes these records, finds all records with CDRType as 1, and note its corresponding SMS Status Code. For example, the Mapper outputs are

6, 1
0, 1

The Reducer takes these as inputs and output number of times a particular status code has been obtained in the CDR records.

The corresponding Mapper and Reducer are

publicclassSMSCDRMapperextendsMapper<LongWritable, Text, Text, IntWritable> {   privateText status = newText();  privatefinalstatic
IntWritable addOne =
newIntWritable(1);
   /**   * Returns the SMS status code and its count   */  protectedvoidmap(LongWritable key, Text value, Context context)      throwsjava.io.IOException, InterruptedException {     //655209;1;796764372490213;804422938115889;6 is the Sample record format    String[] line = value.toString().split(";");    // If record is of SMS CDR    if(Integer.parseInt(line[1]) == 1) {      status.set(line[4]);      context.write(status, addOne);    }  }}

The corresponding Reducer code is

publicclassSMSCDRReducerextends  Reducer<Text, IntWritable, Text, IntWritable> {   protectedvoidreduce(Text key, Iterable<IntWritable> values, Context context) throwsjava.io.IOException, InterruptedException {    intsum = 0;    for(IntWritable value : values) {      sum += value.get();    }    context.write(key,newIntWritable(sum));  }}

The MRUnit test class for the Mapper is

importjava.util.ArrayList;importjava.util.List; importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mrunit.mapreduce.MapDriver;importorg.apache.hadoop.mrunit.mapreduce.MapReduceDriver;importorg.apache.hadoop.mrunit.mapreduce.ReduceDriver;importorg.junit.Before;importorg.junit.Test; publicclassSMSCDRMapperReducerTest {   MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;  ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;  MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;   @Before  publicvoidsetUp() {    SMSCDRMapper mapper = newSMSCDRMapper();    SMSCDRReducer reducer = newSMSCDRReducer();    mapDriver = MapDriver.newMapDriver(mapper);;    reduceDriver = ReduceDriver.newReduceDriver(reducer);    mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);  }   @Test  publicvoidtestMapper() {    mapDriver.withInput(newLongWritable(),newText(        "655209;1;796764372490213;804422938115889;6"));    mapDriver.withOutput(newText("6"),newIntWritable(1));    mapDriver.runTest();  }   @Test  publicvoidtestReducer() {    List<IntWritable> values = newArrayList<IntWritable>();    values.add(newIntWritable(1));    values.add(newIntWritable(1));    reduceDriver.withInput(newText("6"), values);    reduceDriver.withOutput(newText("6"),newIntWritable(2));    reduceDriver.runTest();  }}

Run the test class as JUnit class and it will pass or fail the test depending upon if the mapper is correctly written or not.

Testing Counters

One common use of self-created Counter is to track malformed records in the input.

For example, when the input CDR record is not SMS type, the Mapper can ignore that record and increase the counter.

The revised Mapper with Counter is shown below.

publicclassSMSCDRMapperextendsMapper<LongWritable, Text, Text, IntWritable> {   privateText status = newText();  privatefinalstatic
IntWritable addOne =
newIntWritable(1);
   staticenumCDRCounter {    NonSMSCDR;  };   /**   * Returns the SMS status code and its count   */  protectedvoidmap(LongWritable key, Text value, Context context) throwsjava.io.IOException, InterruptedException {     String[] line = value.toString().split(";");    // If record is of SMS CDR    if(Integer.parseInt(line[1]) == 1) {      status.set(line[4]);      context.write(status, addOne);    }else{// CDR record is not of type SMS so increment the counter      context.getCounter(CDRCounter.NonSMSCDR).increment(1);    }  }}

The revised testMapper() method:

publicvoidtestMapper() {    mapDriver.withInput(newLongWritable(),newText(        "655209;0;796764372490213;804422938115889;6"));    //mapDriver.withOutput(new Text("6"), new IntWritable(1));    mapDriver.runTest();      assertEquals("Expected 1 counter increment",1, mapDriver.getCounters()              .findCounter(CDRCounter.NonSMSCDR).getValue());  }

When the CDR record is of non SMS type out counter should be incremented by Mapper class , we are checking this by assertion that it is really incremented by one.

Similarly you can test counters for Reducer and its Counter.

Passing arguments for Testing

In next part we would see how to pass arguments to test class using the Configuration class

Configuration parameters are fetched using

Configuration.get() methods in Mapper and Reducer classes

Declare new Configruation object for your test class

Configuration
conf =
newConfiguration();

In setUp() method add following

mapDriver.setConfiguration(conf);conf.set("myParameter1","20");conf.set("myParameter2","23");

Your test class would pass on these parameters to the mappers.

Happy Testing