how to sort numerically in hadoop's shuffle/sort phase? how to sort numerically in hadoop's shuffle/sort phase? hadoop hadoop

how to sort numerically in hadoop's shuffle/sort phase?


Assuming you are using Hadoop Streaming, you need to use the KeyFieldBasedComparator class.

  1. -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator should be added to streaming command

  2. You need to provide type of sorting required using mapred.text.key.comparator.options. Some useful ones are -n : numeric sort, -r : reverse sort

EXAMPLE :

Create an identity mapper and reducer with the following code

This is the mapper.py & reducer.py

#!/usr/bin/env pythonimport sysfor line in sys.stdin:        print "%s" % (line.strip())

This is the input.txt

1112207340

This is the Streaming command

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D  mapred.text.key.comparator.options=-n -input /user/input.txt -output /user/output.txt -file ~/mapper.py -mapper ~/mapper.py -file ~/reducer.py -reducer ~/reducer.py

And you will get the required output

1   2   3   7   11  20  40

NOTE :

  1. I have used a simple one key input. If however you have multiple keys and/or partitions, you will have to edit mapred.text.key.comparator.options as needed. Since I do not know your use case , my example is limited to this

  2. Identity mapper is needed since you will need atleast one mapper for a MR job to run.

  3. Identity reducer is needed since shuffle/sort phase will not work if it is a pure map only job.


Hadoop's default comparator compares your keys based on the Writable type (more precisely WritableComparable) you use. If you are dealing with IntWritable or LongWritable then it will sort them numerically.

I assume you are using Text in your example therefore you'll end up having natural sort order.

In special cases, however, you can also write your own comparator.
E.g: for testing purposes only, here's a quick sample how to change the sort order of Text keys: this will treat them as integers and will produce numerical sort order:

public class MyComparator extends WritableComparator {        public MyComparator() {            super(Text.class);        }        @Override        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {            try {                String v1 = Text.decode(b1, s1, l1);                String v2 = Text.decode(b2, s2, l2);                int v1Int = Integer.valueOf(v1.trim());                int v2Int = Integer.valueOf(v2.trim());                return (v1Int < v2Int) ? -1 : ((v1Int > v2Int) ? 1 : 0);            }            catch (IOException e) {                throw new IllegalArgumentException(e);            }        }    }

In the jobrunner class set:

Job job = new Job();...job.setSortComparatorClass(MyComparator.class);


For streaming with order Hadoop (which may use -jobconf instead of -D for configuration), you can sort by key:

-jobconf stream.num.map.output.key.fields=2\-jobconf mapreduce.partition.keycomparator.options="-k2,2nr"\-jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator

By stream.num.map.output.key.fields, 1st and 2nd columns are key 1 and key 2.

mapreduce.partition.keycomparator.options="-k2,2nr" means sorting in reverse order by using 2nd key (from 2nd to 2nd keys) as numeric value.

It is pretty much like Linux sort command!