Need help implementing this algorithm with map Hadoop MapReduce
Map Reduce is easily implemented using some nice Java 6 concurrency features, especially Future, Callable and ExecutorService.
I created a Callable that will analyse a file in the way you specified
public class FileAnalyser implements Callable<String> { private Scanner scanner; private List<String> termList; public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException { this.termList = termList; scanner = new Scanner(new File(filename)); } @Override public String call() throws Exception { StringBuilder buffer = new StringBuilder(); while (scanner.hasNextLine()) { String line = scanner.nextLine(); String[] tokens = line.split(" "); if ((tokens.length >= 3) && (inTermList(tokens[2]))) buffer.append(line); } return buffer.toString(); } private boolean inTermList(String term) { return termList.contains(term); }}
We need to create a new callable for each file found and submit this to the executor service. The result of the submission is a Future which we can use later to obtain the result of the file parse.
public class Analayser { private static final int THREAD_COUNT = 10; public static void main(String[] args) { //All callables will be submitted to this executor service //Play around with THREAD_COUNT for optimum performance ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); //Store all futures in this list so we can refer to them easily List<Future<String>> futureList = new ArrayList<Future<String>>(); //Some random term list, I don't know what you're using. List<String> termList = new ArrayList<String>(); termList.add("terma"); termList.add("termb"); //For each file you find, create a new FileAnalyser callable and submit //this to the executor service. Add the future to the list //so we can check back on the result later for each filename in all files { try { Callable<String> worker = new FileAnalyser(filename, termList); Future<String> future = executor.submit(worker); futureList.add(future); } catch (FileNotFoundException fnfe) { //If the file doesn't exist at this point we can probably ignore, //but I'll leave that for you to decide. System.err.println("Unable to create future for " + filename); fnfe.printStackTrace(System.err); } } //You may want to wait at this point, until all threads have finished //You could maybe loop through each future until allDone() holds true //for each of them. //Loop over all finished futures and do something with the result //from each for (Future<String> current : futureList) { String result = current.get(); //Do something with the result from this future } }}
My example here is far from complete, and far from efficient. I haven't considered the sample size, if it's really huge you could keep looping over the futureList, removing elements that have finished, something similar to:
while (futureList.size() > 0) { for (Future<String> current : futureList) { if (current.isDone()) { String result = current.get(); //Do something with result futureList.remove(current); break; //We have modified the list during iteration, best break out of for-loop } }}
Alternatively you could implement a producer-consumer type setup where the producer submits callables to the executor service and produces a future and the consumer takes the result of the future and discards then future.
This would maybe require the produce and consumer be threads themselves, and a synchronized list for adding/removing futures.
Any questions please ask.