"Parallel.For" for Java?
I guess the closest thing would be:
ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);try { for (final Object o : list) { exec.submit(new Runnable() { @Override public void run() { // do stuff with o. } }); }} finally { exec.shutdown();}
Based on TheLQ's comments, you would set SUM_NUM_THREADS to Runtime.getRuntime().availableProcessors();
Edit: Decided to add a basic "Parallel.For" implementation
public class Parallel { private static final int NUM_CORES = Runtime.getRuntime().availableProcessors(); private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For")); public static <T> void For(final Iterable<T> elements, final Operation<T> operation) { try { // invokeAll blocks for us until all submitted tasks in the call complete forPool.invokeAll(createCallables(elements, operation)); } catch (InterruptedException e) { e.printStackTrace(); } } public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) { List<Callable<Void>> callables = new LinkedList<Callable<Void>>(); for (final T elem : elements) { callables.add(new Callable<Void>() { @Override public Void call() { operation.perform(elem); return null; } }); } return callables; } public static interface Operation<T> { public void perform(T pParameter); }}
Example Usage of Parallel.For
// Collection of items to process in parallelCollection<Integer> elems = new LinkedList<Integer>();for (int i = 0; i < 40; ++i) { elems.add(i);}Parallel.For(elems, // The operation to perform with each item new Parallel.Operation<Integer>() { public void perform(Integer param) { System.out.println(param); };});
I guess this implementation is really more similar to Parallel.ForEach
EditI put this up on GitHub if anyone is interested. Parallel For on GitHub
MLaw's solution is a very practical Parallel.ForEach. I added a bit modification to make a Parallel.For.
public class Parallel{static final int iCPU = Runtime.getRuntime().availableProcessors();public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody){ ExecutorService executor = Executors.newFixedThreadPool(iCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (final T param : parameters) { Future<?> future = executor.submit(new Runnable() { public void run() { loopBody.run(param); } }); futures.add(future); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } } executor.shutdown(); }public static void For(int start, int stop, final LoopBody<Integer> loopBody){ ExecutorService executor = Executors.newFixedThreadPool(iCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (int i=start; i<stop; i++) { final Integer k = i; Future<?> future = executor.submit(new Runnable() { public void run() { loopBody.run(k); } }); futures.add(future); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } } executor.shutdown(); }}public interface LoopBody <T>{ void run(T i);}public class ParallelTest{int k; public ParallelTest(){ k = 0; Parallel.For(0, 10, new LoopBody <Integer>() { public void run(Integer i) { k += i; System.out.println(i); } }); System.out.println("Sum = "+ k);}public static void main(String [] argv){ ParallelTest test = new ParallelTest();}}
Built upon mlaw suggestion, add CountDownLatch.Add chunksize to reduce submit().
When tested with 4 million items array, this onegives 5X speed up over sequential for() on my Core i7 2630QM CPU.
public class Loop { public interface Each { void run(int i); } private static final int CPUs = Runtime.getRuntime().availableProcessors(); public static void withIndex(int start, int stop, final Each body) { int chunksize = (stop - start + CPUs - 1) / CPUs; int loops = (stop - start + chunksize - 1) / chunksize; ExecutorService executor = Executors.newFixedThreadPool(CPUs); final CountDownLatch latch = new CountDownLatch(loops); for (int i=start; i<stop;) { final int lo = i; i += chunksize; final int hi = (i<stop) ? i : stop; executor.submit(new Runnable() { public void run() { for (int i=lo; i<hi; i++) body.run(i); latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) {} executor.shutdown(); } public static void main(String [] argv) { Loop.withIndex(0, 9, new Loop.Each() { public void run(int i) { System.out.println(i*10); } }); }}