Need help implementing this algorithm with map Hadoop MapReduce Need help implementing this algorithm with map Hadoop MapReduce hadoop hadoop

Need help implementing this algorithm with map Hadoop MapReduce


You can just use an empty reducer, and partition your job to run a single mapper per file. Each mapper will create its own output file in your output folder.


Map Reduce is easily implemented using some nice Java 6 concurrency features, especially Future, Callable and ExecutorService.

I created a Callable that will analyse a file in the way you specified

public class FileAnalyser implements Callable<String> {  private Scanner scanner;  private List<String> termList;  public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException {    this.termList = termList;    scanner = new Scanner(new File(filename));  }  @Override  public String call() throws Exception {    StringBuilder buffer = new StringBuilder();    while (scanner.hasNextLine()) {      String line = scanner.nextLine();      String[] tokens = line.split(" ");      if ((tokens.length >= 3) && (inTermList(tokens[2])))        buffer.append(line);    }    return buffer.toString();  }  private boolean inTermList(String term) {    return termList.contains(term);  }}

We need to create a new callable for each file found and submit this to the executor service. The result of the submission is a Future which we can use later to obtain the result of the file parse.

public class Analayser {  private static final int THREAD_COUNT = 10;  public static void main(String[] args) {    //All callables will be submitted to this executor service    //Play around with THREAD_COUNT for optimum performance    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);    //Store all futures in this list so we can refer to them easily    List<Future<String>> futureList = new ArrayList<Future<String>>();    //Some random term list, I don't know what you're using.    List<String> termList = new ArrayList<String>();    termList.add("terma");    termList.add("termb");    //For each file you find, create a new FileAnalyser callable and submit    //this to the executor service. Add the future to the list    //so we can check back on the result later    for each filename in all files {      try {        Callable<String> worker = new FileAnalyser(filename, termList);        Future<String> future = executor.submit(worker);        futureList.add(future);      }      catch (FileNotFoundException fnfe) {        //If the file doesn't exist at this point we can probably ignore,        //but I'll leave that for you to decide.        System.err.println("Unable to create future for " + filename);        fnfe.printStackTrace(System.err);      }    }    //You may want to wait at this point, until all threads have finished    //You could maybe loop through each future until allDone() holds true    //for each of them.    //Loop over all finished futures and do something with the result    //from each    for (Future<String> current : futureList) {      String result = current.get();      //Do something with the result from this future    }  }}

My example here is far from complete, and far from efficient. I haven't considered the sample size, if it's really huge you could keep looping over the futureList, removing elements that have finished, something similar to:

while (futureList.size() > 0) {      for (Future<String> current : futureList) {        if (current.isDone()) {          String result = current.get();          //Do something with result          futureList.remove(current);          break; //We have modified the list during iteration, best break out of for-loop        }      }}

Alternatively you could implement a producer-consumer type setup where the producer submits callables to the executor service and produces a future and the consumer takes the result of the future and discards then future.

This would maybe require the produce and consumer be threads themselves, and a synchronized list for adding/removing futures.

Any questions please ask.