Hadoop / MapReduce - Optimizing "Top N" Word Count MapReduce Job Hadoop / MapReduce - Optimizing "Top N" Word Count MapReduce Job hadoop hadoop

Hadoop / MapReduce - Optimizing "Top N" Word Count MapReduce Job


This is a very good question, because you have hit the inefficiency of Hadoop's word count example.

The tricks to optimize your problem are the following:

Do a HashMap based grouping in your local map stage, you can also use a combiner for that. This can look like this, I'm using the HashMultiSet of Guava, which faciliates a nice counting mechanism.

    public static class WordFrequencyMapper extends      Mapper<LongWritable, Text, Text, LongWritable> {    private final HashMultiset<String> wordCountSet = HashMultiset.create();    @Override    protected void map(LongWritable key, Text value, Context context)        throws IOException, InterruptedException {      String[] tokens = value.toString().split("\\s+");      for (String token : tokens) {        wordCountSet.add(token);      }    }

And you emit the result in your cleanup stage:

@Overrideprotected void cleanup(Context context) throws IOException,    InterruptedException {  Text key = new Text();  LongWritable value = new LongWritable();  for (Entry<String> entry : wordCountSet.entrySet()) {    key.set(entry.getElement());    value.set(entry.getCount());    context.write(key, value);  }}

So you have grouped the words in a local block of work, thus reducing network usage by using a bit of RAM. You can also do the same with a Combiner, but it is sorting to group- so this would be slower (especially for strings!) than using a HashMultiset.

To just get the Top N, you will only have to write the Top N in that local HashMultiset to the output collector and aggregate the results in your normal way on the reduce side. This saves you a lot of network bandwidth as well, the only drawback is that you need to sort the word-count tuples in your cleanup method.

A part of the code might look like this:

  Set<String> elementSet = wordCountSet.elementSet();  String[] array = elementSet.toArray(new String[elementSet.size()]);  Arrays.sort(array, new Comparator<String>() {    @Override    public int compare(String o1, String o2) {      // sort descending      return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));    }  });  Text key = new Text();  LongWritable value = new LongWritable();  // just emit the first n records  for(int i = 0; i < N, i++){    key.set(array[i]);    value.set(wordCountSet.count(array[i]));    context.write(key, value);  }

Hope you get the gist of doing as much of the word locally and then just aggregate the top N of the top N's ;)


Quoting Thomas

To just get the Top N, you will only have to write the Top N in that local HashMultiset to the output collector and aggregate the results in your normal way on the reduce side. This saves you a lot of network bandwidth as well, the only drawback is that you need to sort the word-count tuples in your cleanup method.

If you write only top N in the local HashMultiset then there is a possibility that you are going to miss the count of an element that, If passed from this local HashMultiset, could become one of the overall top 10 elements.

For example consider following format as three maps as MapName: elementName,elemenntcount:

Map A : Ele1,4 : Ele2,5 : Ele3,5 : Ele4,2

Map B : Ele1,1 : Ele5,7 : Ele6, 3 : Ele7,6

Map C : Ele5,4 : Ele8,3 : Ele1,1 : Ele9,3

Now If we considered the top 3 of each mappers we will Miss the element "Ele1" whose total count should have been 6 but since we are calculating each mapper's top 3 we see "Ele1"'s total count as 4.

I hope that makes sense. Please let me know what you think about it.