multi-threading based RabbitMQ consumer multi-threading based RabbitMQ consumer multithreading multithreading

multi-threading based RabbitMQ consumer


I like how you wrote your question - it started out very broad and focused in to specifics. I have successfully implemented something very similar, and am currently working on an open-source project to take my lessons learned and give them back to the community. Unfortunately, though- I have yet to package my code up neatly, which doesn't help you much! Anyway, to answer your questions:

1. Is it possible to use threading for multiple queues.

A: Yes, but it can be full of pitfalls. Namely, the RabbitMQ .NET library is not the best-written piece of code out there, and I have found it to be a relatively cumbersome implementation of the AMQP protocol. One of the most pernicious caveats is how it handles the "receiving" or "consuming" behavior, which can cause deadlocks quite easily if you aren't careful. Fortunately, it is well-illustrated in the API documentation.Advice - if you can, use a singleton connection object. Then, in each thread, use the connection to create a new IModel and corresponding consumers.

2. How to gracefully handle exceptions in threads - I believe this is another topic and I will not address it here as there are several methods you can use.

3. Any open-source projects? - I liked the thinking behind EasyNetQ, although I ended up rolling my own anyway. I'll hopefully remember to follow back when my open source project is completed, as I believe it is an even better improvement than EasyNetQ.


You may find this answer very helpful. I have a very basic understanding of how RabbitMQ works, but I'd probably go on with one subscriber per channel per thread, as suggested there.

There is certainly more than one option to organize the threading model for this. The actual implementation will depend on how you need to process messages from multiple queues: either in parallel, or by aggregating them and serializing the processing. The following code is a console app which implements a simulation of the latter case. It uses the Task Parallel Library and the BlockingCollection class (which comes very handy for this kind of task).

using System;using System.Collections.Concurrent;using System.Collections.Generic;using System.Linq;using System.Threading;using System.Threading.Tasks;namespace Console_21842880{    class Program    {        BlockingCollection<object> _commonQueue;        // process an individual queue        void ProcessQueue(int id, BlockingCollection<object> queue, CancellationToken token)        {            while (true)            {                // observe cancellation                token.ThrowIfCancellationRequested();                // get a message, this blocks and waits                var message = queue.Take(token);                // process this message                // just place it to the common queue                var wrapperMessage = "queue " + id + ", message: " + message;                _commonQueue.Add(wrapperMessage);            }        }        // process the common aggregated queue        void ProcessCommonQeueue(CancellationToken token)        {            while (true)            {                // observe cancellation                token.ThrowIfCancellationRequested();                // this blocks and waits                // get a message, this blocks and waits                var message = _commonQueue.Take(token);                // process this message                Console.WriteLine(message.ToString());            }        }        // run the whole process        async Task RunAsync(CancellationToken token)        {            var queues = new List<BlockingCollection<object>>();            _commonQueue = new BlockingCollection<object>();            // start individual queue processors            var tasks = Enumerable.Range(0, 4).Select((i) =>            {                var queue = new BlockingCollection<object>();                queues.Add(queue);                return Task.Factory.StartNew(                    () => ProcessQeueue(i, queue, token),                     TaskCreationOptions.LongRunning);            }).ToList();            // start the common queue processor            tasks.Add(Task.Factory.StartNew(                () => ProcessCommonQeueue(token),                TaskCreationOptions.LongRunning));            // start the simulators            tasks.AddRange(Enumerable.Range(0, 4).Select((i) =>                 SimulateMessagesAsync(queues, token)));            // wait for all started tasks to complete            await Task.WhenAll(tasks);        }        // simulate a message source        async Task SimulateMessagesAsync(List<BlockingCollection<object>> queues, CancellationToken token)        {            var random = new Random(Environment.TickCount);            while (true)            {                token.ThrowIfCancellationRequested();                await Task.Delay(random.Next(100, 1000));                var queue = queues[random.Next(0, queues.Count)];                var message = Guid.NewGuid().ToString() + " " +  DateTime.Now.ToString();                queue.Add(message);            }        }        // entry point        static void Main(string[] args)        {            Console.WriteLine("Ctrl+C to stop...");            var cts = new CancellationTokenSource();            Console.CancelKeyPress += (s, e) =>            {                // cancel upon Ctrl+C                e.Cancel = true;                cts.Cancel();            };            try            {                new Program().RunAsync(cts.Token).Wait();            }            catch (Exception ex)            {                if (ex is AggregateException)                    ex = ex.InnerException;                Console.WriteLine(ex.Message);            }            Console.WriteLine("Press Enter to exit");            Console.ReadLine();        }    }}

Another idea may be to use Reactive Extensions (Rx). If you can think of the arriving messages as of events, and Rx can help aggregating them into single stream.