Mapper and Reducer for K means algorithm in Hadoop in Java Mapper and Reducer for K means algorithm in Hadoop in Java hadoop hadoop

Mapper and Reducer for K means algorithm in Hadoop in Java


Ok I give it a go to tell you what I thought when implementing k-means in MapReduce.This implementation differs from that of Mahout, mainly because it is to show how the algorithm could work in a distributed setup (and not for real production usage).Also I assume that you really know how k-means works.

That having said we have to divide the whole algorithm into three main stages:

  1. Job level
  2. Map level
  3. Reduce level

The Job Level

The job level is fairly simple, it is writing the input (Key = the class called ClusterCenter and Value = the class called VectorWritable), handling the iteration with the Hadoop job and reading the output of the whole job.

VectorWritable is a serializable implementation of a vector, in this case from my own math library, but actually nothing else than a simple double array.

The ClusterCenter is mainly a VectorWritable, but with convenience functions that a center usually needs (averaging for example).

In k-means you have some seedset of k-vectors that are your initial centers and some input vectors that you want to cluster. That is exactly the same in MapReduce, but I am writing them to two different files. The first file only contains the vectors and some dummy key center and the other file contains the real initial centers (namely cen.seq).

After all that is written to disk you can start your first job. This will of course first launch a Mapper which is the next topic.

The Map Level

In MapReduce it is always smart to know what is coming in and what is going out (in terms of objects).So from the job level we know that we have ClusterCenter and VectorWritable as input, whereas the ClusterCenter is currently just a dummy. For sure we want to have the same as output, because the map stage is the famous assignment step from normal k-means.

You are reading the real centers file you created at job level to memory for comparision between the input vectors and the centers. Therefore you have this distance metric defined, in the mapper it is hardcoded to the ManhattanDistance.To be a bit more specific, you get a part of your input in map stage and then you get to iterate over each input "key value pair" (it is a pair or tuple consisting of key and value) comparing with each of the centers. Here you are tracking which center is the nearest and then assign it to the center by writing the nearest ClusterCenter object along with the input vector itself to disk.

Your output is then: n-vectors along with their assigned center (as the key).Hadoop is now sorting and grouping by your key, so you get every assigned vector for a single center in the reduce task.

The Reduce Level

As told above, you will have a ClusterCenter and its assigned VectorWritable's in the reduce stage.This is the usual update step you have in normal k-means. So you are simply iterating over all vectors, summing them up and averaging them.

Now you have a new "Mean" which you can compare to the mean it was assigned before. Here you can measure a difference between the two centers which tells us about how much the center moved. Ideally it wouldn't have moved and converged.

The counter in Hadoop is used to track this convergence, the name is a bit misleading because it actually tracks how many centers have not converged to a final point, but I hope you can live with it.

Basically you are writing now the new center and all the vectors to disk again for the next iteration. In addition in the cleanup step, you are writing all the new gathered centers to the path used in the map step, so the new iteration has the new vectors.


Now back at the job stage, the MapReduce job should be done now. Now we are inspecting the counter of that job to get the number of how many centers haven't converged yet. This counter is used at the while loop to determine if the whole algorithm can come to an end or not. If not, return to the Map Level paragraph again, but use the output from the previous job as the input.

Actually this was the whole VooDoo.

For obvious reasons this shouldn't be used in production, because its performance is horrible. Better use the more tuned version of Mahout. But for educational purposes this algorithm is fine ;)

If you have any more questions, feel free to write me a mail or comment.