Thursday, 22 March 2012

Hadoop Testing: MRUnit

MRUnit: Hadoop Testing tool
The distributed nature of MapReduce programs makes debugging a challenge. Attaching a debugger to a remote process is cumbersome, and the lack of a single console makes it difficult to inspect what is occurring when several distributed copies of a mapper or reducer are running concurrently.
MRUnit helps bridge the gap between MapReduce programs and JUnit by providing a set of interfaces and test harnesses, which allow MapReduce programs to be more easily tested using standard tools and practices.
MRUnit is testing framework for testing MapReduce programs written for running in Hadoop . MRUnit makes testing Mapper and Reducer classes easier.


Setup of development environment:
1.       Download junit-4.10.jar
2. Hadoop-mrunit-0.20.2-cdh3u1.jar. It is under /usr/lib/hadoop-0.20/contrib/mrunit/
***  For example if you are using the Hadoop version 0.23.x. Then use mrunit-x.x.x  incubating-hadoop023.jar
3.   Add both jar to class path
Writing test case:
Initially
1.       Create a source folder as ‘test’ at your project
2.       Right click on the class to which you want to create testcase
3.       Select New -> Junit Test case -> Select source folder to test -> Finish.


CODES:
The corresponding Word Count code is presented below:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class WordCount extends Configured implements Tool {

   static public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
      final private static LongWritable ONE = new LongWritable(1);
      private Text tokenValue = new Text();

      @Override
      protected void map(LongWritable offset, Text text, Context context) throws IOException, InterruptedException {
         for (String token : text.toString().split("\\s+")) {
            tokenValue.set(token);
            context.write(tokenValue, ONE);
         }
      }
   }

   static public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
      private LongWritable total = new LongWritable();

      @Override
      protected void reduce(Text token, Iterable<LongWritable> counts, Context context)
            throws IOException, InterruptedException {
         long n = 0;
         for (LongWritable count : counts)
            n += count.get();
         total.set(n);
         context.write(token, total);
      }
   }

   public int run(String[] args) throws Exception {
      Configuration configuration = getConf();

      Job job = new Job(configuration, "Word Count");
      job.setJarByClass(WordCount.class);

      job.setMapperClass(WordCountMapper.class);
      job.setCombinerClass(WordCountReducer.class);
      job.setReducerClass(WordCountReducer.class);

      job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(TextOutputFormat.class);

      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(LongWritable.class);

      return job.waitForCompletion(true) ? 0 : -1;
   }

   public static void main(String[] args) throws Exception {
      System.exit(ToolRunner.run(new WordCount(), args));
   }
}



Test Case Code:

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;



public class WordCountTest {
               
                /*We declare three variables for Mapper Driver , Reducer Driver , MapReduceDrivers
                Generics parameters for each of them is point worth noting
                MapDriver generics matches with our test Mapper generics

                SMSCDRMapper extends Mapper<LongWritable, Text, Text, IntWritable>
                Similarly for ReduceDriver we have same matching generics declaration with

                SMSCDRReducer extends Reducer<Text, IntWritable, Text, IntWritable>*/
               
   MapReduceDriver<LongWritable, Text, Text, LongWritable, Text, LongWritable> mapReduceDriver;
   MapDriver<LongWritable, Text, Text, LongWritable> mapDriver;
   ReduceDriver<Text, LongWritable, Text, LongWritable> reduceDriver;

  
   //create instances of our Mapper , Reducer .
   //Set the corresponding mappers and reducers using setXXX() methods
   @Before
   public void setUp() {
      WordCount.WordCountMapper mapper = new WordCount.WordCountMapper();
      WordCount.WordCountReducer reducer = new WordCount.WordCountReducer();
      mapDriver = new MapDriver<LongWritable, Text, Text, LongWritable>();
      mapDriver.setMapper(mapper);
      reduceDriver = new ReduceDriver<Text, LongWritable, Text, LongWritable>();
      reduceDriver.setReducer(reducer);
      mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, LongWritable, Text, LongWritable>();
      mapReduceDriver.setMapper(mapper);
      mapReduceDriver.setReducer(reducer);
   }

   @Test
   public void testMapper() {
                   //gave one sample line input to the mapper
      mapDriver.withInput(new LongWritable(1), new Text("sky sky sky oh my beautiful sky"));
      //expected output for the mapper
      mapDriver.withOutput(new Text("sky"), new LongWritable(1));
      mapDriver.withOutput(new Text("sky"), new LongWritable(1));
      mapDriver.withOutput(new Text("sky"), new LongWritable(1));
      mapDriver.withOutput(new Text("oh"), new LongWritable(1));
      mapDriver.withOutput(new Text("my"), new LongWritable(1));
      mapDriver.withOutput(new Text("beautiful"), new LongWritable(1));
      mapDriver.withOutput(new Text("sky"), new LongWritable(1));
      //runTest() method run the Mapper test with input
      mapDriver.runTest();
   }

   @Test
   public void testReducer() {
      List<LongWritable> values = new ArrayList<LongWritable>();
      values.add(new LongWritable(1));
      values.add(new LongWritable(1));
      reduceDriver.withInput(new Text("sky"), values);
      reduceDriver.withOutput(new Text("sky"), new LongWritable(2));
      reduceDriver.runTest();
   }

 @Test
   public void testMapReduce() {
      mapReduceDriver.withInput(new LongWritable(1), new Text("sky sky sky"));
      mapReduceDriver.addOutput(new Text("sky"), new LongWritable(3));
   
      mapReduceDriver.runTest();
   }
}


explanation of the above code is as follows:
MapReduceDriver<LongWritable, Text, Text, LongWritable, Text, LongWritable> mapReduceDriver;
MapDriver<LongWritable, Text, Text, LongWritable> mapDriver;
ReduceDriver<Text, LongWritable, Text, LongWritable> reduceDriver;

Declare three variables for Mapper Driver , Reducer Driver , MapReduceDrivers
Generics parameters for each of them is point worth noting
MapDriver generics matches with our test Mapper generics
The generic declaration for MapReduceDriver is Mapper Input and Output key value pairs We provide the input key and value that should be sent to the Mapper, and outputs you expect to be sent by the Reducer to the collector for those inputs.

setup() method we are telling to testing class to create instances of our Mapper , Reducer . Set the corresponding mappers and reducers using setXXX() methods.

In test method :
We gave one sample line input to the mapper using withInput() method.
We tell the expected output to test class with withOutput() method.
runTest() method run the Mapper test with input
Run the test class as Junit class.

Thank You.