How to reconnect to RabbitMQ? How to reconnect to RabbitMQ? python python

How to reconnect to RabbitMQ?


RabbitMQ uses heartbeats to detect and close "dead" connections and to prevent network devices (firewalls etc.) from terminating "idle" connections. From version 3.5.5 on, the default timeout is set to 60 seconds (previously it was ~10 minutes). From the docs:

Heartbeat frames are sent about every timeout / 2 seconds. After two missed heartbeats, the peer is considered to be unreachable.

The problem with Pika's BlockingConnection is that it is unable to respond to heartbeats until some API call is made (for example, channel.basic_publish(), connection.sleep(), etc).

The approaches I found so far:

Increase or deactivate the timeout

RabbitMQ negotiates the timeout with the client when establishing the connection. In theory, it should be possible to override the server default value with a bigger one using the heartbeat_interval argument, but the current Pika version (0.10.0) uses the min value between those offered by the server and the client. This issue is fixed on current master.

On the other hand, is possible to deactivate the heartbeat functionality completely by setting the heartbeat_interval argument to 0, which may well drive you into new issues (firewalls dropping connections, etc)

Reconnecting

Expanding on @itsafire's answer, you can write your own publisher class, letting you reconnect when required. An example naive implementation:

import loggingimport jsonimport pikaclass Publisher:    EXCHANGE='my_exchange'    TYPE='topic'    ROUTING_KEY = 'some_routing_key'    def __init__(self, host, virtual_host, username, password):        self._params = pika.connection.ConnectionParameters(            host=host,            virtual_host=virtual_host,            credentials=pika.credentials.PlainCredentials(username, password))        self._conn = None        self._channel = None    def connect(self):        if not self._conn or self._conn.is_closed:            self._conn = pika.BlockingConnection(self._params)            self._channel = self._conn.channel()            self._channel.exchange_declare(exchange=self.EXCHANGE,                                           type=self.TYPE)    def _publish(self, msg):        self._channel.basic_publish(exchange=self.EXCHANGE,                                    routing_key=self.ROUTING_KEY,                                    body=json.dumps(msg).encode())        logging.debug('message sent: %s', msg)    def publish(self, msg):        """Publish msg, reconnecting if necessary."""        try:            self._publish(msg)        except pika.exceptions.ConnectionClosed:            logging.debug('reconnecting to queue')            self.connect()            self._publish(msg)    def close(self):        if self._conn and self._conn.is_open:            logging.debug('closing queue connection')            self._conn.close()

Other possibilities

Other possibilities which I yet didn't explore:


Dead simple: some pattern like this.

import timewhile True:    try:        communication_handles = connect_pika()        do_your_stuff(communication_handles)    except pika.exceptions.ConnectionClosed:        print 'oops. lost connection. trying to reconnect.'        # avoid rapid reconnection on longer RMQ server outage        time.sleep(0.5) 

You will probably have to re-factor your code, but basically it is about catching the exception, mitigate the problem and continue doing your stuff.The communication_handles contain all the pika elements like channels, queues and whatever that your stuff needs to communicate with RabbitMQ via pika.