how to sort numerically in hadoop's shuffle/sort phase?
Assuming you are using Hadoop Streaming, you need to use the KeyFieldBasedComparator class.
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator should be added to streaming command
You need to provide type of sorting required using mapred.text.key.comparator.options. Some useful ones are -n : numeric sort, -r : reverse sort
EXAMPLE :
Create an identity mapper and reducer with the following code
This is the mapper.py & reducer.py
#!/usr/bin/env pythonimport sysfor line in sys.stdin: print "%s" % (line.strip())
This is the input.txt
1112207340
This is the Streaming command
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapred.text.key.comparator.options=-n -input /user/input.txt -output /user/output.txt -file ~/mapper.py -mapper ~/mapper.py -file ~/reducer.py -reducer ~/reducer.py
And you will get the required output
1 2 3 7 11 20 40
NOTE :
I have used a simple one key input. If however you have multiple keys and/or partitions, you will have to edit mapred.text.key.comparator.options as needed. Since I do not know your use case , my example is limited to this
Identity mapper is needed since you will need atleast one mapper for a MR job to run.
Identity reducer is needed since shuffle/sort phase will not work if it is a pure map only job.
Hadoop's default comparator compares your keys based on the Writable
type (more precisely WritableComparable
) you use. If you are dealing with IntWritable
or LongWritable
then it will sort them numerically.
I assume you are using Text
in your example therefore you'll end up having natural sort order.
In special cases, however, you can also write your own comparator.
E.g: for testing purposes only, here's a quick sample how to change the sort order of Text keys: this will treat them as integers and will produce numerical sort order:
public class MyComparator extends WritableComparator { public MyComparator() { super(Text.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { String v1 = Text.decode(b1, s1, l1); String v2 = Text.decode(b2, s2, l2); int v1Int = Integer.valueOf(v1.trim()); int v2Int = Integer.valueOf(v2.trim()); return (v1Int < v2Int) ? -1 : ((v1Int > v2Int) ? 1 : 0); } catch (IOException e) { throw new IllegalArgumentException(e); } } }
In the jobrunner class set:
Job job = new Job();...job.setSortComparatorClass(MyComparator.class);
For streaming with order Hadoop (which may use -jobconf
instead of -D
for configuration), you can sort by key:
-jobconf stream.num.map.output.key.fields=2\-jobconf mapreduce.partition.keycomparator.options="-k2,2nr"\-jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
By stream.num.map.output.key.fields
, 1st and 2nd columns are key 1
and key 2
.
mapreduce.partition.keycomparator.options="-k2,2nr"
means sorting in reverse order by using 2nd key (from 2nd to 2nd keys) as numeric value.
It is pretty much like Linux sort
command!