Executing N number of threads in parallel and in a sequential manner Executing N number of threads in parallel and in a sequential manner multithreading multithreading

Executing N number of threads in parallel and in a sequential manner


One possible candidate for this can be TPL Dataflow. This is a demonstration which takes in a stream of integers and prints them out to the console. You set the MaxDegreeOfParallelism to whichever many threads you wish to spin in parallel:

void Main(){    var actionBlock = new ActionBlock<int>(            i => Console.WriteLine(i),             new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 16});    foreach (var i in Enumerable.Range(0, 200))    {        actionBlock.Post(i);    }}

This can also scale well if you want to have multiple producer/consumers.


Here is the manual way of doing this.

You need a queue. The queue is sequence of pending tasks. You have to dequeue and put them inside list of working task. When ever the task is done remove it from list of working task and take another from queue. Main thread controls this process. Here is the sample of how to do this.

For the test i used List of integer but it should work for other types because its using generics.

private static void Main(){    Random r = new Random();    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();    ParallelQueue(items, DoWork);}private static void ParallelQueue<T>(List<T> items, Action<T> action){    Queue pending = new Queue(items);    List<Task> working = new List<Task>();    while (pending.Count + working.Count != 0)    {        if (pending.Count != 0 && working.Count < 16)  // Maximum tasks        {            var item = pending.Dequeue(); // get item from queue            working.Add(Task.Run(() => action((T)item))); // run task        }        else        {            Task.WaitAny(working.ToArray());            working.RemoveAll(x => x.IsCompleted); // remove finished tasks        }    }}private static void DoWork(int i) // do your work here.{    // this is just an example    Task.Delay(i).Wait();     Console.WriteLine(i);}

Please let me know if you encounter problem of how to implement DoWork for your self. because if you change method signature you may need to do some changes.

Update

You can also do this with async await without blocking the main thread.

private static void Main(){    Random r = new Random();    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();    Task t = ParallelQueue(items, DoWork);    // able to do other things.    t.Wait();}private static async Task ParallelQueue<T>(List<T> items, Func<T, Task> func){    Queue pending = new Queue(items);    List<Task> working = new List<Task>();    while (pending.Count + working.Count != 0)    {        if (working.Count < 16 && pending.Count != 0)        {            var item = pending.Dequeue();            working.Add(Task.Run(async () => await func((T)item)));        }        else        {            await Task.WhenAny(working);            working.RemoveAll(x => x.IsCompleted);        }    }}private static async Task DoWork(int i){    await Task.Delay(i);}


var workitems = ... /*e.g. Enumerable.Range(0, 1000000)*/;SingleItemPartitioner.Create(workitems) .AsParallel() .AsOrdered() .WithDegreeOfParallelism(16) .WithMergeOptions(ParallelMergeOptions.NotBuffered) .ForAll(i => { Thread.Slee(1000); Console.WriteLine(i); });

This should be all you need. I forgot how the methods are named exactly... Look at the documentation.

Test this by printing to the console after sleeping for 1sec (which this sample code does).