Developing, testing and debugging Hadoop map/reduce jobs with Eclipse Developing, testing and debugging Hadoop map/reduce jobs with Eclipse hadoop hadoop

Developing, testing and debugging Hadoop map/reduce jobs with Eclipse


I develop Cassandra/Hadoop applications in Eclipse by:

  1. Using maven (m2e) to gather and configure the dependencies (Hadoop, Cassandra, Pig, etc.) for my Eclipse projects

  2. Creating test cases (classes in src/test/java) to test my mappers and reducers. The trick is to build a context object on the fly using inner classes that extend RecordWriter and StatusReporter. If you do this then after you invoke setup/map/cleanup or setup/reduce/cleanup you can assert the correct key/value pairs and context info were written by the mapper or reducer. The constructors for contexts in both mapred and mapreduce look ugly, but you'll find the classes are pretty easy to instantiate.

  3. Once you write these tests maven will invoke them automatically every time you build.

  4. You can invoke the tests manually by selecting the project and doing a Run --> Maven Test. This turns out to be really handy because the tests are invoked in debug mode and you can set breakpoints in your mappers and reducers and do all the cool things Eclipse lets you do in debug.

  5. Once you're happy with the quality of your code, use Maven to build a jar-with-dependencies for that all in one jar that hadoop likes so much.

Just as a side note, I've built a number of code generation tools based on the M2T JET project in Eclipse. They generate out the infrastructure for everything I've mentioned above and I just write the logic for my mappers, reducers and test cases. I think if you gave it some thought you could probably come up with a set of reusable classes that you could extend to do pretty much the same thing.

Here's a sample test case class:

/* *  * This source code and information are provided "AS-IS" without  * warranty of any kind, either expressed or implied, including * but not limited to the implied warranties of merchantability * and/or fitness for a particular purpose. *  * This source code was generated using an evaluation copy  * of the Cassandra/Hadoop Accelerator and may not be used for * production purposes. * */package com.creditco.countwords.ReadDocs;// Begin imports import java.io.IOException;import java.util.ArrayList;import junit.framework.TestCase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Counters;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.StatusReporter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.TaskAttemptID;import org.junit.Test;// End imports public class ParseDocsMapperTest extends TestCase {    @Test    public void testCount() {        TestRecordWriter    recordWriter    = new TestRecordWriter();        TestRecordReader    recordReader    = new TestRecordReader();        TestOutputCommitter outputCommitter = new TestOutputCommitter();        TestStatusReporter  statusReporter  = new TestStatusReporter();        TestInputSplit      inputSplit      = new TestInputSplit();        try {                // Begin test logic                // Get an instance of the mapper to be tested and a context instance            ParseDocsMapper mapper = new ParseDocsMapper();            Mapper<LongWritable,Text,Text,IntWritable>.Context context =                 mapper.testContext(new Configuration(), new TaskAttemptID(),recordReader,recordWriter,outputCommitter,statusReporter,inputSplit);                // Invoke the setup, map and cleanup methods            mapper.setup(context);            LongWritable key = new LongWritable(30);            Text value = new Text("abc def ghi");            mapper.map(key, value, context);            if (recordWriter.getKeys().length != 3) {                fail("com.creditco.countwords:ParseDocsMapperTest.testCount() - Wrong number of records written ");            }            mapper.cleanup(context);                // Validation:                //                // recordWriter.getKeys() returns the keys written to the context by the mapper                // recordWriter.getValues() returns the values written to the context by the mapper                // statusReporter returns the most recent status and any counters set by the mapper                //                // End test logic        } catch (Exception e) {            fail("com.creditco.countwords:ParseDocsMapperTest.testCount() - Exception thrown: "+e.getMessage());        }    }    final class TestRecordWriter extends RecordWriter<Text, IntWritable> {        ArrayList<Text> keys = new ArrayList<Text>();        ArrayList<IntWritable> values = new ArrayList<IntWritable>();        public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { }        public void write(Text key, IntWritable value) throws IOException, InterruptedException {            keys.add(key);            values.add(value);        }        public Text[] getKeys() {            Text result[] = new Text[keys.size()];            keys.toArray(result);            return result;        }        public IntWritable[] getValues() {            IntWritable[] result = new IntWritable[values.size()];            values.toArray(result);            return result;        }    };      final class TestRecordReader extends RecordReader<LongWritable, Text> {        public void close() throws IOException { }        public LongWritable getCurrentKey() throws IOException, InterruptedException {            throw new RuntimeException("Tried to call RecordReader:getCurrentKey()");        }        public Text getCurrentValue() throws IOException, InterruptedException {            throw new RuntimeException("Tried to call RecordReader:getCurrentValue()");        }        public float getProgress() throws IOException, InterruptedException {            throw new RuntimeException("Tried to call RecordReader:getProgress()");        }        public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { }        public boolean nextKeyValue() throws IOException, InterruptedException {            return false;        }    };    final class TestStatusReporter extends StatusReporter {        private Counters counters = new Counters();        private String status = null;        public void setStatus(String arg0) {            status = arg0;        }        public String getStatus() {            return status;        }        public void progress() { }        public Counter getCounter(String arg0, String arg1) {            return counters.getGroup(arg0).findCounter(arg1);        }        public Counter getCounter(Enum<?> arg0) {            return null;        }    };    final class TestInputSplit extends InputSplit {        public String[] getLocations() throws IOException, InterruptedException {            return null;        }        public long getLength() throws IOException, InterruptedException {            return 0;        }    };    final class TestOutputCommitter extends OutputCommitter {        public void setupTask(TaskAttemptContext arg0) throws IOException { }        public void setupJob(JobContext arg0) throws IOException { }        public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {            return false;        }        public void commitTask(TaskAttemptContext arg0) throws IOException { }        public void cleanupJob(JobContext arg0) throws IOException { }        public void abortTask(TaskAttemptContext arg0) throws IOException { }    };}

and here's a sample maven pom. Note that the referenced versions are a bit out of date, but as long as those versions are kept in a maven repository somewhere, you'll be able to build this project.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>com.creditco</groupId>  <artifactId>wordcount.example</artifactId>  <version>0.0.1-SNAPSHOT</version>    <build>        <plugins>            <plugin>                <artifactId>maven-assembly-plugin</artifactId>                <version>2.2</version>                <configuration>                    <descriptorRefs>                        <descriptorRef>jar-with-dependencies</descriptorRef>                    </descriptorRefs>                </configuration>            </plugin>        </plugins>    </build>  <dependencies>    <dependency>        <groupId>org.apache.hadoop</groupId>        <artifactId>hadoop-core</artifactId>        <version>0.20.2</version>        <type>jar</type>        <scope>compile</scope>    </dependency>    <dependency>        <groupId>org.apache.cassandra</groupId>        <artifactId>cassandra-all</artifactId>        <version>1.0.6</version>        <type>jar</type>        <scope>compile</scope>    </dependency>    <dependency>        <groupId>org.cassandraunit</groupId>        <artifactId>cassandra-unit</artifactId>        <version>1.0.1.1</version>        <type>jar</type>        <scope>compile</scope>        <exclusions>            <exclusion>                <artifactId>hamcrest-all</artifactId>                <groupId>org.hamcrest</groupId>            </exclusion>        </exclusions>    </dependency>    <dependency>        <groupId>org.apache.pig</groupId>        <artifactId>pig</artifactId>        <version>0.9.1</version>        <type>jar</type>        <scope>compile</scope>    </dependency>    <dependency>        <groupId>org.json</groupId>        <artifactId>json</artifactId>        <version>20090211</version>        <type>jar</type>        <scope>compile</scope>    </dependency>  </dependencies></project>


I use MiniMRCluster cluster which comes with Apache. You use to start a mini Map Reduce cluster inside a unit test! HBase also has HBaseTestingUtil which is great since you start HDFS and MapReduce in roughly two lines.


@Chris Gerken - I am trying to run the Word Count job in Eclipse by running the Driver as a Java application but I get ClassNotFoundException on the Mapper. Looks to me that running as a java application, hadoop job does not get the needed Mapper and Reduce with the jar.