RabbitMQ wait for multiple queues to finish RabbitMQ wait for multiple queues to finish symfony symfony

RabbitMQ wait for multiple queues to finish


You need to implement this: http://www.eaipatterns.com/Aggregator.html but the RabbitMQBundle for Symfony doesn't support that so you would have to use the underlying php-amqplib.

A normal consumer callback from the bundle will get an AMQPMessage. From there you can access the channel and manually publish to whatever exchanges comes next in your "pipes and filters" implementation


In the RPC tutorial at RabbitMQ's site, there is a way to pass around a 'Correlation id' that can identify your messages to users in the queue.

I'd recommend using some sort of id with your messages into the first 3 queues and then have another process to dequeue messages from the 3 into buckets of some sort. When those buckets receive what I'm assuming is the completion of there 3 tasks, send the final message off to the 4th queue for processing.

If you are sending more than 1 work item to each queue for one user, you might have to do a little preprocessing to find out how many items a particular user placed into the queue so the process dequeuing before 4 knows how many to expect before queuing up.


I do my rabbitmq in C#, so sorry my pseudo code isn't in php style

// Clientbyte[] body = new byte[size];body[0] = uniqueUserId;body[1] = howManyWorkItems;body[2] = command;// Setup your body hereQueue(body)

// Server// Process queue 1, 2, 3Dequeue(message)switch(message.body[2]){    // process however you see fit}processedMessages[message.body[0]]++;if(processedMessages[message.body[0]] == message.body[1]){    // Send to queue 4    Queue(newMessage)}

Response to Update #1

Instead of thinking of your client as a terminal, it might be useful to think of the client as a process on a server. So if you setup an RPC client on a server like this one, then all you need to do is have the server handle the generation of a unique id of a user and send the messages to the appropriate queues:

    public function call($uniqueUserId, $workItem) {        $this->response = null;        $this->corr_id = uniqid();        $msg = new AMQPMessage(            serialize(array($uniqueUserId, $workItem)),            array('correlation_id' => $this->corr_id,            'reply_to' => $this->callback_queue)        );        $this->channel->basic_publish($msg, '', 'rpc_queue');        while(!$this->response) {            $this->channel->wait();        }        // We assume that in the response we will get our id back        return deserialize($this->response);    }$rpc = new Rpc();// Get unique user information and work items here// Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need.$response = rpc->call($uniqueUserId, $workItem);$responseBuckets[array[0]]++;// Just like above code that sees if a bucket is full or not


I am a little unclear on what you are trying to achieve here. But I would probably alter the design somewhat so that once all messages are cleared from the queues you publish to a separate exchange which publishes to queue 4.