How to create a delayed queue in RabbitMQ? How to create a delayed queue in RabbitMQ? python python

How to create a delayed queue in RabbitMQ?


I found this extremely useful when developing my applications. As it gives you an alternative to simply re-queuing your messages. This can easily reduce the complexity of your code, and is one of many powerful hidden features in RabbitMQ.

Steps

First we need to set up two basic channels, one for the main queue, and one for the delay queue. In my example at the end, I include a couple of additional flags that are not required, but makes the code more reliable; such as confirm delivery, delivery_mode and durable. You can find more information on these in the RabbitMQ manual.

After we have set up the channels we add a binding to the main channel that we can use to send messages from the delay channel to our main queue.

channel.queue_bind(exchange='amq.direct',                   queue='hello')

Next we need to configure our delay channel to forward messages to the main queue once they have expired.

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={  'x-message-ttl' : 5000,  'x-dead-letter-exchange' : 'amq.direct',  'x-dead-letter-routing-key' : 'hello'})
  • x-message-ttl (Message - Time To Live)

    This is normally used to automatically remove old messages in thequeue after a specific duration, but by adding two optional arguments wecan change this behaviour, and instead have this parameter determinein milliseconds how long messages will stay in the delay queue.

  • x-dead-letter-routing-key

    This variable allows us to transfer the message to a different queueonce they have expired, instead of the default behaviour of removingit completely.

  • x-dead-letter-exchange

    This variable determines which Exchange used to transfer the message from hello_delay to hello queue.

Publishing to the delay queue

When we are done setting up all the basic Pika parameters you simply send a message to the delay queue using basic publish.

delay_channel.basic_publish(exchange='',                      routing_key='hello_delay',                      body="test",                      properties=pika.BasicProperties(delivery_mode=2))

Once you have executed the script you should see the following queues created in your RabbitMQ management module. enter image description here

Example.

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(               'localhost'))# Create normal 'Hello World' type channel.channel = connection.channel()channel.confirm_delivery()channel.queue_declare(queue='hello', durable=True)# We need to bind this channel to an exchange, that will be used to transfer # messages from our delay queue.channel.queue_bind(exchange='amq.direct',                   queue='hello')# Create our delay channel.delay_channel = connection.channel()delay_channel.confirm_delivery()# This is where we declare the delay, and routing for our delay channel.delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={  'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds.  'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B.  'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.})delay_channel.basic_publish(exchange='',                      routing_key='hello_delay',                      body="test",                      properties=pika.BasicProperties(delivery_mode=2))print " [x] Sent"


You can use RabbitMQ official plugin: x-delayed-message .

Firstly, download and copy the ez file into Your_rabbitmq_root_path/plugins

Secondly, enable the plugin (do not need to restart the server):

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Finally, publish your message with "x-delay" headers like:

headers.put("x-delay", 5000);

Notice:

It does not ensure your message's safety, cause if your message expires just during your rabbitmq-server's downtime, unfortunately the message is lost. So be careful when you use this scheme.

Enjoy it and more info in rabbitmq-delayed-message-exchange


FYI, how to do this in Spring 3.2.x.

<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/><rabbit:queue-arguments id="delayQueueArguments">  <entry key="x-message-ttl">    <value type="java.lang.Long">10000</value>  </entry>  <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>  <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/></rabbit:queue-arguments><rabbit:fanout-exchange name="finalDestinationTopic">  <rabbit:bindings>    <rabbit:binding queue="finalDestinationQueue"/>  </rabbit:bindings></rabbit:fanout-exchange>