Hadoop Streaming : Chaining Jobs Hadoop Streaming : Chaining Jobs hadoop hadoop

Hadoop Streaming : Chaining Jobs


This answer is what the asker actually put in the question. I would normally quote it but will abstain from this as it is so large.


This is a documentation on how to chain two or more streaming jobs, using HadoopStreaming(currently 1.0.3).

In order to understand the final code that will do the chaining and be able towrite any other chain job some preliminary but practical theory is required.

First of all, what is a job in Hadoop? A Hadoop job is

hadoopJob = Configuration + Execution

where,

Configuration : all the set up that makes Execution possible.

Execution : the set of executable or script files that accomplish the desired task. In other words, the map and reduce steps of our task.

Configuration = hadoopEnvironment + userEnvironment

where,

hadoopEnvironment : is the set up of Hadoop general environment. This general environment is defined from resources i.e. xml files that lay in the $HADOOP_HOME/conf directory. For example some resouces are core-site.xml, mapred-site.xml and hadoop-site.xml which define hdfs temporary directory, job tracker and number of cluster nodes respectively.

userEnvrironment : is the user specified arguments when running a job. In Hadoop these arguments are called options.

userEnvironment = genericOptions + streamingOptions

where,

genericOptions : they are general in the sense that they appeal to every streaming job, independently from job. They are handled from GenericsOptionsParser.

streamingOptions : they are job specific in the sense that they appeal to a certain job. For example, every job has its own input and output directories or files. They are handled from StreamJob.

Schematically,

                            hadoopJob                               /\                              /  \                             /    \                            /      \                           /        \                Configuration       Execution                     /\                 |                    /  \                |                   /    \   executable or script files                  /      \                 /        \                /          \  hadoopEnvironment     userEnvironment           |                   /\           |                  /  \           |                 /    \     $HADOOP_HOME/conf       /      \                           /        \                   genericOptions   streamingOptions                      |                 |                      |                 |            GenericOptionsParser    StreamJob

As anyone can see, all the above its a series of configurations. A part of it isfor the administrator of the cluster( hadoopEnvironment) and the other part is for the user( userEnvironment) of the cluster. To conclude, a job is mainly aconfiguration in an abstract level, if we forget for the moment the executionpart.

Our code should take care of all the above. Now we are ready to write code.

First of all, what is a Hadoop job at code level? It is a jar file. Whenever wesubmit a job we submit a jar file with some command line arguments. For examplewhen we run a single streaming job, we execute the command

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/

where,

our job is the hadoop-streaming-1.0.3.jar

with command line arguments -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/

Inside this jar there is our class which takes care of everything properly.

So, we open a new java file, say TestChain.java,

// import everything neededpublic class TestChain{    //code here    public static void main( String[] args) throws Exception    {        //code here    }//end main}//end TestChain

To handle hadoopEnvironment, our class should inherit class Configured. Class Configured gives us access to Hadoop's environment and parameters i.e. to its resources mentioned before. The resources are xml files that contain data in the form of name / value pair.

Moving forward, every interface is more or less a medium between the external world and the task the world wants to accomplish. That said, an interface is the tool we use to accomplish our task. Our class consequently is a tool. For this, our class must implement Tool interface which declares a method run(). This method defines ours tool behaviour, when the interface is implemented of course. Finally, in order to use our tool, we use class ToolRunner. ToolRunner, through class GenericOptionsParser, helps to handle genericOptions from userEnvironment too.

import org.apache.hadoop.conf.Configured;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.util.Tool;// import everything else neededpublic class TestChain extends Configured implements Tool{    public int run( String[] args) throws Exception    {        //code here        return 0;    }//end run    public static void main( String[] args) throws Exception    {        // ToolRunner handles generic command line options          int res = ToolRunner.run( new Configuration(), new TestChain(), args);        System.exit( res);    }//end main}//end TestChain

To complete the picture, method run() is also called the driver, sets up the job, including initialization and configuration of the job. Notice above that we delegated to ToolRunner dealing with the hadoopEnnvironment by the first parameter 'new Configuration' of method ToolRunner.run().

What we did so far? We just set the environment in which our tool will operate.Now we have to define our tool i.e. to do the chaining.

As far as every chain job is a streaming job, we create each of them as such. We do this using StreamJob.createJob( String[] args) method of class StreamJob. The args matrix of Strings contains the "command line" arguments of each job. These command line arguments refer to the streamingOptions( job specific) of userEnvironment. Moreover these arguments are in the form of parameter / value pair. For example, if our job has in.txt file as input, /out/ as output directory, m.py as mapper and r.py as reducer then,

String[] example = new String[]{    "-mapper"   , "m.py"    "-reducer"  , "r.py"    "-input"    , "in.txt"    "-output"   , "/out/"}

You have to be careful on two things. First, the "-" is necessary. It is that little thing that distinguises parameters from values. Here, mapper is a parameter and m.py is its value. The difference is understood from the "-". Secondly, if you add a space between left " and "-" of a parameter, then this parameter is ignored. If we have " -mapper" then " -mapper" is not considered as a parameter. When StreamJob parses the args matrix looks for pairs of parameter / value. One last thing, recall that a job is roughly a configuration. We expecting so, that StreamJob.creatJob() should return a configuration or something similar to that. Indeed StreamJob.createJob() returns a JobConf object. A JobConf object in short is a description of a specific mapreduce job that Hadoop understands and can execute of course.

Assuming that we have three jobs to chain,

import org.apache.hadoop.util.Tool;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.streaming.StreamJob;// import everything else neededpublic class TestChain extends Configured implements Tool{    public int run( String[] args) throws Exception    {        String[] job1 = new String[]        {            "-mapper"   , "m1.py"            "-reducer"  , "r1.py"            "-input"    , "in1.txt"            "-output"   , "/out1/"        }        JobConf job1Conf = new StreamJob.createJob( job1);        //code here        String[] job2 = new String[]        {            "-mapper"   , "m2.py"            "-reducer"  , "r2.py"            "-input"    , "in2.txt"            "-output"   , "/out2/"        }        JobConf job2Conf = new StreamJob.createJob( job2);        //code here        String[] job3 = new String[]        {            "-mapper"   , "m3.py"            "-reducer"  , "r3.py"            "-input"    , "in3.txt"            "-output"   , "/out3/"        }        JobConf job3Conf = new StreamJob.createJob( job3);        //code here        return 0;    }//end run    public static void main( String[] args) throws Exception    {        // ToolRunner handles generic command line options          int res = ToolRunner.run( new Configuration(), new TestChain(), args);        System.exit( res);    }//end main}//end TestChain

At this point we set the environment in which our tool is going to operate and we defined its behaviour. We haven't put it in action however. ToolRunner is notenough. ToolRunner, runs our tool as a whole. It doesn't run the individualchain jobs. We have to do this.

There are two ways for doing this. The first way is to use JobClient and the second is to use JobControl.

First Way - JobClient

With JobClient we run chain jobs as a sequence, one job runs after another bycalling a JobClient for each job. The method that runs each individual job isJobClient.runJob( jobtorun) where jobtorun is a JobConf object.

import org.apache.hadoop.util.Tool;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.streaming.StreamJob;public class TestChain extends Configured implements Tool{    public int run( String[] args) throws Exception    {        String[] job1 = new String[]        {            "-mapper"   , "m1.py"            "-reducer"  , "r1.py"            "-input"    , "in1.txt"            "-output"   , "/out1/"        }        JobConf job1Conf = new StreamJob.createJob( job1);        JobClient.runJob( job1Conf);        String[] job2 = new String[]        {            "-mapper"   , "m2.py"            "-reducer"  , "r2.py"            "-input"    , "in2.txt"            "-output"   , "/out2/"        }        JobConf job2Conf = new StreamJob.createJob( job2);        JobClient.runJob( job2Conf);        String[] job3 = new String[]        {            "-mapper"   , "m3.py"            "-reducer"  , "r3.py"            "-input"    , "in3.txt"            "-output"   , "/out3/"        }        JobConf job3Conf = new StreamJob.createJob( job3);        JobClient.runJob( job3Conf);        return 0;    }//end run    public static void main( String[] args) throws Exception    {        // ToolRunner handles generic command line options          int res = ToolRunner.run( new Configuration(), new TestChain(), args);        System.exit( res);    }//end main}//end TestChain

An advantage of this way, using JobClient, is that job progress is printed on the standard output.

A disadvantage of JobClient is that it can't take care of dependencies between jobs.

Second way - JobControl

With JobControl, all chain jobs are part of a group of jobs. Here, every job is executed in the frame of that group. This implies that every chain job has to be added in the group at first and then the group is the one that runs. The group is a FIFO or the execution of each job in the group follows the FCFS( First Come First Served) schema. Each job is added in the group with the method JobControl.addJob( jobtoadd).

JobControl can handle dependencies through method x.addDependingJob( y) wherejob x depends from job y. That means, job x can't run until job y is finished.If job x is dependent from both jobs y and z and z is independent from y, thenwith x.addDependingJob( y) and x.addDependingJob( z) we can express thesedependencies.

JobControl in contradiction to JobClient, "works" with Job objects. When we callfor example x.addDependingJob( y) method, x, y are Job objects. The same holdsfor JobControl.addJob( jobtoadd), jobtoadd is a Job object. Each Job object iscreated from a JobConf object. Going back to code we have,

import org.apache.hadoop.util.Tool;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.jobcontrol.Job;import org.apache.hadoop.mapred.jobcontrol.JobControl;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.streaming.StreamJob;public class TestChain extends Configured implements Tool{    public int run( String[] args) throws Exception    {        //TestChain below is an arbitrary name for the group        JobControl jobc = new JobControl( "TestChain");        String[] job1 = new String[]        {            "-mapper"   , "m1.py"            "-reducer"  , "r1.py"            "-input"    , "in1.txt"            "-output"   , "/out1/"        }        JobConf job1Conf = new StreamJob.createJob( job1);        Job job1 = new Job( job1conf);        jobc.addJob( job1);        String[] job2 = new String[]        {            "-mapper"   , "m2.py"            "-reducer"  , "r2.py"            "-input"    , "in2.txt"            "-output"   , "/out2/"        }        JobConf job2Conf = new StreamJob.createJob( job2);        Job job2 = new Job( job2conf);        jobc.addJob( job2);        String[] job3 = new String[]        {            "-mapper"   , "m3.py"            "-reducer"  , "r3.py"            "-input"    , "/out2/par*"            "-output"   , "/out3/"        }        JobConf job3Conf = new StreamJob.createJob( job3);        Job job3 = new Job( job3conf);        job3.addDependingJob( job2);        jobc.addJob( job3);        //code here        return 0;    }//end run    public static void main( String[] args) throws Exception    {        // ToolRunner handles generic command line options          int res = ToolRunner.run( new Configuration(), new TestChain(), args);        System.exit( res);    }//end main}//end TestChain

In the code above, notice that job3 depends on job2. As you can see job3's inputis job2's output. This fact is a dependency. job3 waits until job2 is finished.

Until now we just added chain jobs in the group and described their dependency.We need one last thing to run this group of jobs.

The brute force says to just call the method JobControl.run(). Although, thisapproach works, it is problematic. When the chain jobs are finished, the wholejob still runs forever. An approach that works properly is to define a new Thread of execution from our job Thread that already exists( when the job runs).Then we can wait until the chain jobs are done and then exit. In the meanwhileof chain jobs' execution we can ask for job execution information, for examplehow many jobs have finished or we can find if a job is in invalid state and which is this.

An advantage of this way of using JobControl is that can take care of the manydependencies that may exist between jobs.

A disadvantage of JobControl is that the job progress is not printed on thestandard output, it is not presented straight forward. Whether a job fails or succeeds, nothing useful is printed. You have to check through Hadoop's web UIor add some code in the while loop below to track job's state or whatever isneeded. Finally,

import org.apache.hadoop.util.Tool;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.jobcontrol.Job;import org.apache.hadoop.mapred.jobcontrol.JobControl;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.streaming.StreamJob;public class TestChain extends Configured implements Tool{    public int run( String[] args) throws Exception    {        //TestChain below is an arbitrary name for the group        JobControl jobc = new JobControl( "TestChain");        String[] job1 = new String[]        {            "-mapper"   , "m1.py"            "-reducer"  , "r1.py"            "-input"    , "in1.txt"            "-output"   , "/out1/"        }        JobConf job1Conf = new StreamJob.createJob( job1);        Job job1 = new Job( job1conf);        jobc.addJob( job1);        String[] job2 = new String[]        {            "-mapper"   , "m2.py"            "-reducer"  , "r2.py"            "-input"    , "in2.txt"            "-output"   , "/out2/"        }        JobConf job2Conf = new StreamJob.createJob( job2);        Job job2 = new Job( job2conf);        jobc.addJob( job2);        String[] job3 = new String[]        {            "-mapper"   , "m3.py"            "-reducer"  , "r3.py"            "-input"    , "/out2/par*"            "-output"   , "/out3/"        }        JobConf job3Conf = new StreamJob.createJob( job3);        Job job3 = new Job( job3conf);        job3.addDependingJob( job2);        jobc.addJob( job3);        Thread runjobc = new Thread( jobc);        runjobc.start();        while( !jobc.allFinished())        {            //do whatever you want; just wait or ask for job information        }        return 0;    }//end run    public static void main( String[] args) throws Exception    {        // ToolRunner handles generic command line options          int res = ToolRunner.run( new Configuration(), new TestChain(), args);        System.exit( res);    }//end main}//end TestChain

ERRORS

This section discusses some errors that may occur. In the error messages below there is a class OptimizingJoins. This class is a class just to demonstrate the various errors and has no relation with this discussion.

A package does not exist while try to compile.

This is a matter of classpath. Compile like( to add hadoop-streaming-1.0.3.jar package for example),

javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java

and add any missing package.

java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob

The total error is,

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJobat OptimizingJoins.run(OptimizingJoins.java:135)at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)at OptimizingJoins.main(OptimizingJoins.java:248)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:601)at org.apache.hadoop.util.RunJar.main(RunJar.java:156)Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.streaming.StreamJobat java.net.URLClassLoader$1.run(URLClassLoader.java:366)at java.net.URLClassLoader$1.run(URLClassLoader.java:355)at java.security.AccessController.doPrivileged(Native Method)at java.net.URLClassLoader.findClass(URLClassLoader.java:354)at java.lang.ClassLoader.loadClass(ClassLoader.java:423)at java.lang.ClassLoader.loadClass(ClassLoader.java:356)... 8 more

This is a matter of manifest file of our jar file. When we compile our job inthe way above, everything is alright. Java compiler finds whatever it needs. Butwhen we run our job in Hadoop through the command

$HADOOP_HOME/bin/hadoop jar /home/hduser/TestChain.jar TestChain

then the JVM that runs our jar cannot find StreamJob. To solve this, when we create the jar file, we put in the jar a manifest file that contains the class-path of StreamJob. Practically,

MANIFEST.MFManifest-Version: 1.0Class-Path: /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jarCreated-By: 1.7.0_07 (Oracle Corporation)

Notice that a MANIFEST.MF file always ends with a blank line. Our MANIFEST.MF file has 4 lines, not 3. Then we create jar file, like,

jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class 

ERROR streaming.StreamJob: Unrecognized option: -D

This error occurs because StreamJob cannot parse -D option. StreamJob can parseonly streaming, job specific options, -D is a generic option.

There are two solutions to this problem. First solution is to use -jobconfoption instead of -D. Second solution is to parse -D option through a GenericOptionsParser object. In the second solution of course you have to removethe -D option from the StreamJob.createJob() args.

To give an example, a "clean" code implementation of the second solution is,

import org.apache.hadoop.util.Tool;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.streaming.StreamJob;public class TestChain{    public class Job1 extends Configured implements Tool    {        public int run( String[] args) throws Exception        {            String[] job1 = new String[]            {                "-mapper"   , "m1.py"                "-reducer"  , "r1.py"                "-input"    , "in1.txt"                "-output"   , "/out1/"            }            JobConf job1Conf = new StreamJob.createJob( job1);            JobClient.runJob( job1Conf);            return 0;        }//end run    }    public class Job2 extends Configured implements Tool    {        public int run( String[] args) throws Exception        {            String[] job2 = new String[]            {                "-mapper"   , "m2.py"                "-reducer"  , "r2.py"                "-input"    , "in2.txt"                "-output"   , "/out2/"            }            JobConf job2Conf = new StreamJob.createJob( job2);            JobClient.runJob( job2Conf);            return 0;        }//end run    }    public class Job3 extends Configured implements Tool    {        public int run( String[] args) throws Exception        {            String[] job3 = new String[]            {                "-mapper"   , "m3.py"                "-reducer"  , "r3.py"                "-input"    , "in3.txt"                "-output"   , "/out3/"            }            JobConf job3Conf = new StreamJob.createJob( job3);            JobClient.runJob( job3Conf);            return 0;        }//end run    }    public static void main( String[] args) throws Exception    {        TestChain tc = new TestChain();        //Domination        String[] j1args = new String[]        {            "-D", "mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator",            "-D", "mapred.text.key.comparator.options=-k1,1"    ,            "-D", "mapred.reduce.tasks=1"        };        // Let ToolRunner handle generic command-line options           int j1res = ToolRunner.run( new Configuration(), tc.new Job1(), j1args);        //Cost evaluation        String[] j2rgs = new String[]        {            "-D", "mapred.reduce.tasks=12 "                 ,            "-D", "mapred.text.key,partitioner.options=-k1,1"        };        // Let ToolRunner handle generic command-line options           int j2res = ToolRunner.run( new Configuration(), tc.new Job2(), j2args);        //Minimum Cost        String[] j3args = new String[]        {            "-D", "mapred.reduce.tasks=1"        };        // Let ToolRunner handle generic command-line options           int j3res = ToolRunner.run( new Configuration(), tc.new Job1(), j3args);        System.exit( mres);    }}//end TestChain    

In the code above, we define a global class TestChain which encapsulates thechain jobs. Then we define each separate chain job i.e. we define its run method.Every chain job is a class that inherits Configured and implements Tool. Finally,from TestChain's main method we run each job sequentially. Notice that before runany chain job we define its generic options.

Compile

javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java 

Jar

jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class TestChain\$Dom.class TestChain\$Cost.class TestChain\$Min.class 

ERROR security.UserGroupInformation: PriviledgedActionException as:hduser cause:org.apache.hadoop.mapred.InvalidInputException: Input Pattern hdfs://localhost:54310/user/hduser/whateverFile matches 0 files

This error happens when we use JobControl. For example if a job has as input the output of a previous job, then if this input - output file does not exist already, this error occurs. JobControl runs all independent jobs in "parallel", and not one by one as JobClient does. So, Jobcontrol tries to run a job whose input files do not exist and for that reason fails.

To avoid this situation, we declare that there is a dependency between these two jobs using x.addDependingJob( y), job x depends on job y. Now, JobControl does not try to run in parallel dependent jobs.