How would you suggest performing "Join" with Hadoop streaming? How would you suggest performing "Join" with Hadoop streaming? hadoop hadoop

How would you suggest performing "Join" with Hadoop streaming?


Hadoop has a library called KeyFieldBasedPartitioner http://hadoop.apache.org/mapreduce/docs/r0.21.0/api/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.html

Using this as an option in your job launch as the partitioner for your streaming job allows you to break your mapper output into Key/Value pairs and have the keys get hashed up together going to the same reducer and sorting including the values http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#More+Usage+Examples

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \-D stream.map.output.field.separator=. \-D stream.num.map.output.key.fields=4 \-D mapreduce.map.output.key.field.separator=. \-D mapreduce.partition.keypartitioner.options=-k1,2 \-D mapreduce.job.reduces=12 \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer \-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

Here, -D stream.map.output.field.separator=. and -D stream.num.map.output.key.fields=4 are explained here http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#Customizing+How+Lines+are+Split+into+Key%2FValue+Pairs basically they are how you have outputted your mapper fields to define the key/value pairs.

The map output keys of the above MapReduce job normally have four fields separated by ".". However, the MapReduce framework will partition the map outputs by the first two fields of the keys using the -D mapreduce.partition.keypartitioner.options=-k1,2 option. Here, -D mapreduce.map.output.key.field.separator=. specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.

This is effectively equivalent to specifying the first two fields as the primary key and the next two fields as the secondary. The primary key is used for partitioning, and the combination of the primary and secondary keys is used for sorting.

In order to-do a join it is as simple as outputting the fields from your mapper and setting the options on your configuration launch for the fields that are the keys and the reducer will have all of your values joined by key appropriately. If you want to take data from multiple sources just keep adding more -input on the command line... if they are different input lengths then in your mapper you can recognize that and create a standard format output from mapper.