mqtt client does not receive message in case of thread and rest-api mqtt client does not receive message in case of thread and rest-api flask flask

mqtt client does not receive message in case of thread and rest-api


There are a couple of problems with this.

  1. Both the client.connect() and the client.subscribe() calls need iterations of the client network loop to run in order to complete properly.
  2. The network loop needs to run at least once every keep alive period the time after the connection has been made in order to stop the broker disconnecting the client as dead. This means if there is a delay between starting the code and the first REST request then the client will be disconnected.

Better to use the client.start_loop() function to run MQTT client network loop continuously in the background on it's own.

You should also remove the call to client.subscribe() that is outside the on_connect() callback.

EDIT:As hashed out in the comments/chat the following works. It looks like running the flask app in debug mode does some strange things and creates multiple MQTT clients over and over again with the same client id. This leads to the broker constantly kicking the old ones off so messages never get delivered.

from flask import Flask, Responseimport paho.mqtt.client as mqttimport timefrom threading import Threadimport threadingapp = Flask(__name__)lock = threading.Lock()sessionId=0cont=Truedef on_connect(client, userdata, flags, rc): # The callback for when the client connects to the brokerprint("Connected with result code {0}".format(str(rc))) # Print result of connection attemptclient.subscribe("mytopic")def on_message(client, userdata, msg): # The callback for when a PUBLISH message is received from the server.global contprint(msg.topic)cont=Falseclient = mqtt.Client(client_id="foo", clean_session=True)client.on_connect = on_connect # Define callback function for successful connectionclient.on_message = on_message # Define callback function for receipt of a message#client.username_pw_set(mqtt_user, mqtt_password)client.connect("localhost", port=1884)client.loop_start()def test(param1, param2):lock.acquire()try:ret = client.publish("mytopic", "foo")while cont:time.sleep(5)print("loop")finally:lock.release()result = "foo"return result@app.route('/test/check', methods=['POST'])def check():global sessionIdsessionId = sessionId + 1t = Thread(target=test, args=(sessionId,None))t.start()return {'id': sessionId, 'eta': 0}if __name__ == '__main__':print("started")app.run()