SparkStreaming, RabbitMQ and MQTT in python using pika
It looks like you are using wrong port number. Assuming that:
- you have a local instance of RabbitMQ running with default settings and you've enabled MQTT plugin (
rabbitmq-plugins enable rabbitmq_mqtt
) and restarted RabbitMQ server - included
spark-streaming-mqtt
when executingspark-submit
/pyspark
(either withpackages
orjars
/driver-class-path
)
you can connect using TCP with tcp://localhost:1883
. You have to also remember that MQTT is using amq.topic
.
Quick start:
create
Dockerfile
with following content:FROM rabbitmq:3-managementRUN rabbitmq-plugins enable rabbitmq_mqtt
build Docker image:
docker build -t rabbit_mqtt .
start image and wait until server is ready:
docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt
create
producer.py
with following content:import pikaimport time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='amq.topic', type='topic', durable=True)for i in range(1000): channel.basic_publish( exchange='amq.topic', # amq.topic as exchange routing_key='hello', # Routing key used by producer body='Hello World {0}'.format(i) ) time.sleep(3)connection.close()
start producer
python producer.py
and visit management console http://127.0.0.1:15672/#/exchanges/%2F/amq.topic
to see that messages are received.
create
consumer.py
with following content:from pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.streaming.mqtt import MQTTUtilssc = SparkContext()ssc = StreamingContext(sc, 10)mqttStream = MQTTUtils.createStream( ssc, "tcp://localhost:1883", # Note both port number and protocol "hello" # The same routing key as used by producer)mqttStream.count().pprint()ssc.start()ssc.awaitTermination()ssc.stop()
download dependencies (adjust Scala version to the one used to build Spark and Spark version):
mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
make sure
SPARK_HOME
andPYTHONPATH
point to the correct directories.submit
consumer.py
with (adjust versions as before):spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py
If you followed all the steps you should see Hello world messages in the Spark log.
From the MqttAsyncClient
Javadoc, the server URI must have one of the following schemes: tcp://
, ssl://
, or local://
. You need to change your brokerUrl
above to have one of these schemes.
For more information, here's a link to the source for MqttAsyncClient
: