SparkStreaming, RabbitMQ and MQTT in python using pika SparkStreaming, RabbitMQ and MQTT in python using pika python python

SparkStreaming, RabbitMQ and MQTT in python using pika


It looks like you are using wrong port number. Assuming that:

  • you have a local instance of RabbitMQ running with default settings and you've enabled MQTT plugin (rabbitmq-plugins enable rabbitmq_mqtt) and restarted RabbitMQ server
  • included spark-streaming-mqtt when executing spark-submit / pyspark (either with packages or jars / driver-class-path)

you can connect using TCP with tcp://localhost:1883. You have to also remember that MQTT is using amq.topic.

Quick start:

  • create Dockerfile with following content:

    FROM rabbitmq:3-managementRUN rabbitmq-plugins enable rabbitmq_mqtt
  • build Docker image:

    docker build -t rabbit_mqtt .
  • start image and wait until server is ready:

    docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt 
  • create producer.py with following content:

    import pikaimport time connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='amq.topic',                 type='topic', durable=True)for i in range(1000):    channel.basic_publish(        exchange='amq.topic',  # amq.topic as exchange        routing_key='hello',   # Routing key used by producer        body='Hello World {0}'.format(i)    )    time.sleep(3)connection.close()
  • start producer

    python producer.py

    and visit management console http://127.0.0.1:15672/#/exchanges/%2F/amq.topic

    to see that messages are received.

  • create consumer.py with following content:

    from pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.streaming.mqtt import MQTTUtilssc = SparkContext()ssc = StreamingContext(sc, 10)mqttStream = MQTTUtils.createStream(    ssc,     "tcp://localhost:1883",  # Note both port number and protocol    "hello"                  # The same routing key as used by producer)mqttStream.count().pprint()ssc.start()ssc.awaitTermination()ssc.stop()
  • download dependencies (adjust Scala version to the one used to build Spark and Spark version):

    mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
  • make sure SPARK_HOME and PYTHONPATH point to the correct directories.

  • submit consumer.py with (adjust versions as before):

    spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py

If you followed all the steps you should see Hello world messages in the Spark log.


From the MqttAsyncClient Javadoc, the server URI must have one of the following schemes: tcp://, ssl://, or local://. You need to change your brokerUrl above to have one of these schemes.

For more information, here's a link to the source for MqttAsyncClient:

https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java#L272