Python multi-threaded unittesting Python multi-threaded unittesting multithreading multithreading

Python multi-threaded unittesting


If your design assumes everything must go through the queue, then don't fight it - make everything go through it!

Add an on_call event to your queued event handler, and register to it the following function:

def on_call(self, callback):    callback()

then modify your test to:

def test_MyClass(self):    def threaded_test():        self.assertEqual(mc.get(),6)    mc = MyClass()    self.queue.put(1)    self.queue.put(2)    self.queue.put(3)    self.queue.put({'event_name':'call','val':threaded_test})


You can have a look at test_threading.py in the stdlib tests which does something similar to what you are trying to do. The basic idea is to protect a thread execution with mutex and semaphore so that the execution is complete before the test condition is asserted.