Multi threaded file processing with .NET Multi threaded file processing with .NET multithreading multithreading

Multi threaded file processing with .NET


Since there's curiosity on how .NET 4 works with this in comments, here's that approach. Sorry, it's likely not an option for the OP. Disclaimer: This is not a highly scientific analysis, just showing that there's a clear performance benefit. Based on hardware, your mileage may vary widely.

Here's a quick test (if you see a big mistake in this simple test, it's just an example. Please comment, and we can fix it to be more useful/accurate). For this, I just dropped 12,000 ~60 KB files into a directory as a sample (fire up LINQPad; you can play with it yourself, for free! - be sure to get LINQPad 4 though):

var files = Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList();var sw = Stopwatch.StartNew(); //start timerfiles.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serialsw.Stop(); //stopsw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the durationsw.Restart();files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallelsw.Stop();sw.ElapsedMilliseconds.Dump("Run MS - Parallel");

Slightly changing your loop to parallelize the query is all that's needed in most simple situations. By "simple" I mostly mean that the result of one action doesn't affect the next. Something to keep in mind most often is that some collections, for example our handy List<T> is not thread safe, so using it in a parallel scenario isn't a good idea :) Luckily there were concurrent collections added in .NET 4 that are thread safe. Also keep in mind if you're using a locking collection, this may be a bottleneck as well, depending on the situation.

This uses the .AsParallel<T>(IEnumeable<T>) and .ForAll<T>(ParallelQuery<T>) extensions available in .NET 4.0. The .AsParallel() call wraps the IEnumerable<T> in a ParallelEnumerableWrapper<T> (internal class) which implements ParallelQuery<T>. This now allows you to use the parallel extension methods, in this case we're using .ForAll().

.ForAll() internally crates a ForAllOperator<T>(query, action) and runs it synchronously. This handles the threading and merging of the threads after it's running... There's quite a bit going on in there, I'd suggest starting here if you want to learn more, including additional options.


The results (Computer 1 - Physical Hard Disk):

  • Serial: 1288 - 1333ms
  • Parallel: 461 - 503ms

Computer specs - for comparison:

The results (Computer 2 - Solid State Drive):

  • Serial: 545 - 601 ms
  • Parallel: 248 - 278 ms

Computer specifications - for comparison:

  • Quad Core 2 Quad Q9100 @ 2.26 GHz
  • 8 GB RAM (DDR 1333)
  • 120 GB OCZ Vertex SSD (Standard Version - 1.4 Firmware)

I don't have links for the CPU/RAM this time, these came installed. This is a Dell M6400 Laptop (here's a link to the M6500... Dell's own links to the 6400 are broken).


These numbers are from 10 runs, taking the min/max of the inner 8 results (removing the original min/max for each as possible outliers). We hit an I/O bottleneck here, especially on the physical drive, but think about what the serial method does. It reads, processes, reads, processes, rinse repeat. With the parallel approach, you are (even with a I/O bottleneck) reading and processing simultaneously. In the worst bottleneck situation, you're processing one file while reading the next. That alone (on any current computer!) should result in some performance gain. You can see that we can get a bit more than one going at a time in the results above, giving us a healthy boost.

Another disclaimer: Quad core + .NET 4 parallel isn't going to give you four times the performance, it doesn't scale linearly... There are other considerations and bottlenecks in play.

I hope this was on interest in showing the approach and possible benefits. Feel free to criticize or improve... This answer exists solely for those curious as indicated in the comments :)


Design

The Producer/Consumer pattern will probably be the most useful for this situation. You should create enough threads to maximize the throughput.

Here are some questions about the Producer/Consumer pattern to give you an idea of how it works:

You should use a blocking queue and the producer should add files to the queue while the consumers process the files from the queue. The blocking queue requires no locking, so it's about the most efficient way to solve your problem.

If you're using .NET 4.0 there are several concurrent collections that you can use out of the box:

Threading

A single producer thread will probably be the most efficient way to load the files from disk and push them onto the queue; subsequently multiple consumers will be popping items off the queue and they'll process them. I would suggest that you try 2-4 consumer threads per core and take some performance measurements to determine which is most optimal (i.e. the number of threads that provide you with the maximum throughput). I would not recommend the use a ThreadPool for this specific example.

P.S. I don't understand what's the concern with a single point of failure and the use of distributed hash tables? I know DHTs sound like a really cool thing to use, but I would try the conventional methods first unless you have a specific problem in mind that you're trying to solve.


I recommend that you queue a thread for each file and keep track of the running threads in a dictionary, launching a new thread when a thread completes, up to a maximum limit. I prefer to create my own threads when they can be long-running, and use callbacks to signal when they're done or encountered an exception. In the sample below I use a dictionary to keep track of the running worker instances. This way I can call into an instance if I want to stop work early. Callbacks can also be used to update a UI with progress and throughput. You can also dynamically throttle the running thread limit for added points.

The example code is an abbreviated demonstrator, but it does run.

class Program{    static void Main(string[] args)    {        Supervisor super = new Supervisor();        super.LaunchWaitingThreads();        while (!super.Done) { Thread.Sleep(200); }        Console.WriteLine("\nDone");        Console.ReadKey();    }}public delegate void StartCallbackDelegate(int idArg, Worker workerArg);public delegate void DoneCallbackDelegate(int idArg);public class Supervisor{    Queue<Thread> waitingThreads = new Queue<Thread>();    Dictionary<int, Worker> runningThreads = new Dictionary<int, Worker>();    int maxThreads = 20;    object locker = new object();    public bool Done {         get {             lock (locker) {                return ((waitingThreads.Count == 0) && (runningThreads.Count == 0));             }         }     }    public Supervisor()    {        // queue up a thread for each file        Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n)));    }    Thread CreateThread(string fileNameArg)    {        Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile);        thread.IsBackground = true;        return thread;    }    // called when a worker starts    public void WorkerStart(int threadIdArg, Worker workerArg)    {        lock (locker)        {            // update with worker instance            runningThreads[threadIdArg] = workerArg;        }    }    // called when a worker finishes    public void WorkerDone(int threadIdArg)    {        lock (locker)        {            runningThreads.Remove(threadIdArg);        }        Console.WriteLine(string.Format("  Thread {0} done", threadIdArg.ToString()));        LaunchWaitingThreads();    }    // launches workers until max is reached    public void LaunchWaitingThreads()    {        lock (locker)        {            while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0))            {                Thread thread = waitingThreads.Dequeue();                runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate                thread.Start();            }        }    }}public class Worker{    string fileName;    StartCallbackDelegate startCallback;    DoneCallbackDelegate doneCallback;    public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg)    {        fileName = fileNameArg;        startCallback = startCallbackArg;        doneCallback = doneCallbackArg;    }    public void ProcessFile()    {        startCallback(Thread.CurrentThread.ManagedThreadId, this);        Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString()));        File.ReadAllBytes(fileName);        doneCallback(Thread.CurrentThread.ManagedThreadId);    }}