How do I stream data through a flask application?
In case anyone else wants to do the same, this is the simplest example I could boil things down to based on the example by reptilicus...
Instructions
- Set the code below laid out in the structure mentioned below.
- Install the Python modules listed below
- Run the server
- Run the data source
- Open a web browser and navigate to
http://localhost:25000/
If everything worked you should see a very basic page along these lines:
Python / Modules
These were the versions my test implementation used. Others may work too.
- Python v3.5.2
- Pyzmq v15.2.0
- gevent v1.2.0
- karellen-geventws v1.0.1 (required over gevent-websocket for Python 3 support)
- Flask v0.10.1
- Flask-Sockets v0.2.1
You also need a copy of Reconnecting Websocket
, available here.
Code layout
\ZmqFlaskForwarder \static \js application.js reconnecting-websocket.min.js \templates index.html data_source.py server.py
Server App (server.py)
import zmq.green as zmqimport jsonimport geventfrom flask_sockets import Socketsfrom flask import Flask, render_templateimport loggingfrom gevent import monkeymonkey.patch_all()app = Flask(__name__)logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)sockets = Sockets(app)context = zmq.Context()ZMQ_LISTENING_PORT = 12000@app.route('/')def index(): logger.info('Rendering index page') return render_template('index.html')@sockets.route('/zeromq')def send_data(ws): logger.info('Got a websocket connection, sending up data from zmq') socket = context.socket(zmq.SUB) socket.connect('tcp://localhost:{PORT}'.format(PORT=ZMQ_LISTENING_PORT)) socket.setsockopt_string(zmq.SUBSCRIBE, "") poller = zmq.Poller() poller.register(socket, zmq.POLLIN) gevent.sleep() received = 0 while True: received += 1 # socks = dict(poller.poll()) # if socket in socks and socks[socket] == zmq.POLLIN: data = socket.recv_json() logger.info(str(received)+str(data)) ws.send(json.dumps(data)) gevent.sleep()if __name__ == '__main__': logger.info('Launching web server') from gevent import pywsgi from geventwebsocket.handler import WebSocketHandler server = pywsgi.WSGIServer(('', 25000), app, handler_class=WebSocketHandler) logger.info('Starting serving') server.serve_forever()
Data Source (data_source.py)
import zmqimport randomimport sysimport timeimport jsonport = "12000"context = zmq.Context()socket = context.socket(zmq.PUB)socket.bind("tcp://*:%s" % port)while True: first_data_element = random.randrange(2,20) second_data_element = random.randrange(0,360) message = json.dumps({'First Data':first_data_element, 'Second Data':second_data_element}) print(message) socket.send_string(message) time.sleep(0.5)
Client JavaScript (application.js)
ws = new ReconnectingWebSocket("ws://" + location.host + '/zeromq')ws.onmessage = function(message) { payload = JSON.parse(message.data); $('#latest_data').html('<h2> Data: ' + message.data + '</h2>');};
Template (index.html)
<!DOCTYPE html><html> <head> <title>Python Websockets ZeroMQ demo</title> </head> <body> <div class="container"> <h2> Simple ZeroMQ data streaming via web sockets! </h2> <div id="latest_data"></div> </div> <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/jquery/2.0.3/jquery.min.js"></script> <script type="text/javascript" src="static/js/reconnecting-websocket.min.js"></script> <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/d3/4.2.6/d3.min.js"></script> <script type="text/javascript" src="static/js/application.js"></script> </body></html>