Parallel version of Files.walkFileTree (java or scala) Parallel version of Files.walkFileTree (java or scala) multithreading multithreading

Parallel version of Files.walkFileTree (java or scala)


As others have pointed out, walking a file tree is almost certainly IO bound instead of CPU bound so the benefits of doing a multithreaded file tree walk are questionable. But if you really wanted to, you could probably roll your own with a ForkJoinPool or similar.

import java.io.IOException;import java.nio.file.FileVisitResult;import java.nio.file.Files;import java.nio.file.Path;import java.nio.file.Paths;import java.nio.file.SimpleFileVisitor;import java.nio.file.attribute.BasicFileAttributes;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveAction;public class MultiThreadedFileTreeWalk {    private static class RecursiveWalk extends RecursiveAction {        private static final long serialVersionUID = 6913234076030245489L;        private final Path dir;        public RecursiveWalk(Path dir) {            this.dir = dir;        }        @Override        protected void compute() {            final List<RecursiveWalk> walks = new ArrayList<>();            try {                Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {                    @Override                    public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {                        if (!dir.equals(RecursiveWalk.this.dir)) {                            RecursiveWalk w = new RecursiveWalk(dir);                            w.fork();                            walks.add(w);                            return FileVisitResult.SKIP_SUBTREE;                        } else {                            return FileVisitResult.CONTINUE;                        }                    }                    @Override                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {                        System.out.println(file + "\t" + Thread.currentThread());                        return FileVisitResult.CONTINUE;                    }                });            } catch (IOException e) {                e.printStackTrace();            }            for (RecursiveWalk w : walks) {                w.join();            }        }    }    public static void main(String[] args) throws IOException {        RecursiveWalk w = new RecursiveWalk(Paths.get(".").toRealPath());        ForkJoinPool p = new ForkJoinPool();        p.invoke(w);    }}

This example walks each directory on a separate thread. Here's the tutorial for Java 7's fork/join library.


This exercise is neither as brief as the Scala answer nor as Java-like as the Java answer.

The idea here was to start parallel walks with something like a thread per device.

The walkers are on ForkJoinPool threads, so when they kick off a future for each path test, those are forked tasks on the pool. The directory test uses managed blocking when it reads the directory, looking for files.

The result is returned by completing a promise depending on the future path test. (No mechanism here for detecting empty-handed completion.)

A more interesting test would include reading zip files, since the decompression will eat some CPU.

I wonder if paulp will do something clever with deep listing.

import util._import collection.JavaConverters._import concurrent.{ TimeoutException => Timeout, _ }import concurrent.duration._import ExecutionContext.Implicits._import java.io.IOExceptionimport java.nio.file.{ FileVisitResult => Result, _ }import Result.{ CONTINUE => Go, SKIP_SUBTREE => Prune, TERMINATE => Stop }import java.nio.file.attribute.{ BasicFileAttributes => BFA }object Test extends App {  val fileSystem = FileSystems.getDefault  val starts = (if (args.nonEmpty) args.toList else mounts) map (s => (fileSystem getPath s))  val p = Promise[(Path, BFA)]  def pathTest(path: Path, attrs: BFA) =    if (attrs.isDirectory ) {      val entries = blocking {        val res = Files newDirectoryStream path        try res.asScala.toList finally res.close()      }      List("hello","world") forall (n => entries exists (_.getFileName.toString == n))    } else {      path.getFileName.toString == "enough"    }  def visitor(root: Path) = new SimpleFileVisitor[Path] {    def stopOrGo = if (p.isCompleted) Stop else Go    def visiting(path: Path, attrs: BFA) = {      future { pathTest(path, attrs) } onComplete {        case Success(true) => p trySuccess (path, attrs)        case Failure(e)    => p tryFailure e        case _             =>      }      stopOrGo    }    override def preVisitDirectory(dir: Path, attrs: BFA) = (      if ((starts contains dir) && dir != root) Prune      else visiting(dir, attrs)    )    override def postVisitDirectory(dir: Path, e: IOException) = {      if (e != null) p tryFailure e      stopOrGo    }    override def visitFile(file: Path, attrs: BFA) = visiting(file, attrs)  }  //def walk(p: Path): Path = Files walkFileTree (p, Set().asJava, 10, visitor(p))  def walk(p: Path): Path = Files walkFileTree (p, visitor(p))  def show(store: FileStore) = {    val ttl   = store.getTotalSpace / 1024    val used  = (store.getTotalSpace - store.getUnallocatedSpace) / 1024    val avail = store.getUsableSpace / 1024    Console println f"$store%-40s $ttl%12d $used%12d $avail%12d"    store  }  def mounts = {    val devs = for {      store <- fileSystem.getFileStores.asScala      if store.name startsWith "/dev/"      if List("ext4","fuseblk") contains store.`type`    } yield show(store)    val devstr = """(\S+) \((.*)\)""".r    (devs.toList map (_.toString match {      case devstr(name, dev) if devs.toList exists (_.name == dev) => Some(name)      case s => Console println s"Bad dev str '$s', skipping" ; None    })).flatten  }  starts foreach (f => future (walk(f)))  Try (Await result (p.future, 20.seconds)) match {    case Success((name, attrs)) => Console println s"Result: ${if (attrs.isDirectory) "dir" else "file"} $name"    case Failure(e: Timeout)    => Console println s"No result: timed out."    case Failure(t)             => Console println s"No result: $t."  }}


Let's assume that executing a callback on each file is enough.

This code will not handle loops in the file system--you'd need a registry of where you've been for that (e.g. java.util.concurrent.ConcurrentHashMap). There are all sorts of improvements you could add, like reporting exceptions instead of silently ignoring them.

import java.io.Fileimport scala.util._def walk(f: File, callback: File => Unit, pick: File => Boolean = _ => true) {  Try {    val (dirs, fs) = f.listFiles.partition(_.isDirectory)    fs.filter(pick).foreach(callback)    dirs.par.foreach(f => walk(f, callback, pick))  }}

Collecting the files using a fold instead of a foreach is not drastically harder, but I leave that as an exercise to the reader. (A ConcurrentLinkedQueue is probably fast enough to accept them all in a callback anyway unless you have really slow threads and a awesome filesystem.)