Unit testing Python Flask Stream Unit testing Python Flask Stream flask flask

Unit testing Python Flask Stream


You are starting the thread to get_stream() before POSTing any data to REDIS.

This means that it won't find any events and will return without streaming any data.

I believe you don't needs threads at all, you can simply use the POST to setup data for your integration test and then call the stream.

def test_04_receiving_events(self):    headers = [('Content-Type', 'application/json')]    data = json.dumps({"value": 42})    headers.append(('Content-Length', len(data)))    rv_post = self.app.post('/sensor', headers=headers, data=data)    rv_stream = self.app.get('stream/data')    rv_stream_object = json.loads(rv_stream.data)    self.assertEqual(rv_stream.status_code, 200)    self.assertEqual(rv_stream.mimetype, 'text/event-stream')    self.assertEqual(rv_stream_object, "data: {'value': 42}")


If you are interested rather in integration testing than unit testing, my suggestion would be to use the Postman App and its command line integration with newman.

You can have all variables and parameters in an environment and all requests to be tested in a collection. Postman offers docs and tutorials for this, see https://learning.postman.com/. You can do a lot with the free version.

Set Authorization, Params, Headers to match the incoming requests from the service that sends requests to your instance to be tested.When you are done with preparing your requests to successfully run in postman, then you write tests in postman where you might assert specific parts of a requests response body. Postman also offers pre-build snippets to write these.

    pm.test("Body includes 'some string'", function () {        pm.expect(pm.response.text()).to.include("some string");    });

You can also export your collection of requests (includes tests) and environment as json and run it in a shell with newman.

   $ newman run mycollection.json -e dev_environment.json

This is useful if you want to integrate the tests into a CICD pipeline.