Python multi-threaded unittesting
If your design assumes everything must go through the queue, then don't fight it - make everything go through it!
Add an on_call
event to your queued event handler, and register to it the following function:
def on_call(self, callback): callback()
then modify your test to:
def test_MyClass(self): def threaded_test(): self.assertEqual(mc.get(),6) mc = MyClass() self.queue.put(1) self.queue.put(2) self.queue.put(3) self.queue.put({'event_name':'call','val':threaded_test})
You can have a look at test_threading.py in the stdlib tests which does something similar to what you are trying to do. The basic idea is to protect a thread execution with mutex and semaphore so that the execution is complete before the test condition is asserted.