Error "unknown delivery tag" occurs when i try ack messages to RabbitMQ using pika (python) Error "unknown delivery tag" occurs when i try ack messages to RabbitMQ using pika (python) python python

Error "unknown delivery tag" occurs when i try ack messages to RabbitMQ using pika (python)


The problem probably is that you're setting no_ack=True like this:

consumer_tag = channel.basic_consume(    message_delivery_event,    no_ack=True,    queue=queue,)

And then acknowledging the messages:

channel.basic_ack(delivery_tag=args.delivery_tag)

You have to chose if you want to acknowledge or not and set the correct consume parameter.


For me, it was just that I told the queue I wasn't going to ack, then I acked.

E.g. WRONG:

channel.basic_consume(callback, queue=queue_name, no_ack=True)

and then in my callback:

def callback(ch, method, properties, body):  # do stuff  ch.basic_ack(delivery_tag = method.delivery_tag)

RIGHT:

channel.basic_consume(callback, queue=queue_name, no_ack=False)

Bottom line: If you want to manually ack, set no_ack=False.

From the docs:

no_ack: (bool) if set to True, automatic acknowledgement mode will be used (see http://www.rabbitmq.com/confirms.html)


There is a bug with your code. You share a channel across threads. This is not supported by pika (see FAQ). You have 2 options:

  1. Define the no_ack=True flag in basic_get(...) and do not use the channel object in thread's function doWork(...)
  2. If you need to ACK message only after you have finished your work, then let the main thread (the while True: loop) handle the message ack (and not the worker thread). Below is a modified version of your code that does that.

    from __future__ import with_statementimport pikaimport sysfrom pika.adapters.blocking_connection import BlockingConnectionfrom pika import connection, credentialsimport timeimport threadingimport randomfrom pika.adapters.select_connection import SelectConnectionfrom pika.connection import Connectionimport tracebackfrom Queue import Queue, Emptydef doWork(body, args, channel, ack_queue):    time.sleep(random.random())    ack_queue.put(args.delivery_tag)def doAck(channel):    while True:        try:            r = ack_queue.get_nowait()        except Empty:            r = None        if r is None:            break        try:            channel.basic_ack(delivery_tag=r)        except:            traceback.print_exc()auth = credentials.PlainCredentials(username="guest", password="guest")params = connection.ConnectionParameters(host="localhost", credentials=auth)conn = BlockingConnection(params)channel = conn.channel()# Create a queue for the messages that should be ACKed by main threadack_queue = Queue()while True:    time.sleep(0.03)        try:        doAck(channel)        method_frame, header_frame, body = channel.basic_get(queue="test_queue")        if method_frame.NAME == 'Basic.GetEmpty':            continue                t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue])        t.setDaemon(True)        t.start()    except Exception, e:        traceback.print_exc()        continue