Are Generators Threadsafe? Are Generators Threadsafe? python python

Are Generators Threadsafe?


It's not thread-safe; simultaneous calls may interleave, and mess with the local variables.

The common approach is to use the master-slave pattern (now called farmer-worker pattern in PC). Make a third thread which generates data, and add a Queue between the master and the slaves, where slaves will read from the queue, and the master will write to it. The standard queue module provides the necessary thread safety and arranges to block the master until the slaves are ready to read more data.


Edited to add benchmark below.

You can wrap a generator with a lock. For example,

import threadingclass LockedIterator(object):    def __init__(self, it):        self.lock = threading.Lock()        self.it = it.__iter__()    def __iter__(self): return self    def next(self):        self.lock.acquire()        try:            return self.it.next()        finally:            self.lock.release()gen = [x*2 for x in [1,2,3,4]]g2 = LockedIterator(gen)print list(g2)

Locking takes 50ms on my system, Queue takes 350ms. Queue is useful when you really do have a queue; for example, if you have incoming HTTP requests and you want to queue them for processing by worker threads. (That doesn't fit in the Python iterator model--once an iterator runs out of items, it's done.) If you really do have an iterator, then LockedIterator is a faster and simpler way to make it thread safe.

from datetime import datetimeimport threadingnum_worker_threads = 4class LockedIterator(object):    def __init__(self, it):        self.lock = threading.Lock()        self.it = it.__iter__()    def __iter__(self): return self    def next(self):        self.lock.acquire()        try:            return self.it.next()        finally:            self.lock.release()def test_locked(it):    it = LockedIterator(it)    def worker():        try:            for i in it:                pass        except Exception, e:            print e            raise    threads = []    for i in range(num_worker_threads):        t = threading.Thread(target=worker)        threads.append(t)        t.start()    for t in threads:        t.join()def test_queue(it):    from Queue import Queue    def worker():        try:            while True:                item = q.get()                q.task_done()        except Exception, e:            print e            raise    q = Queue()    for i in range(num_worker_threads):         t = threading.Thread(target=worker)         t.setDaemon(True)         t.start()    t1 = datetime.now()    for item in it:        q.put(item)    q.join()start_time = datetime.now()it = [x*2 for x in range(1,10000)]test_locked(it)#test_queue(it)end_time = datetime.now()took = end_time-start_timeprint "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000)


No, they are not thread-safe. You can find interesting info about generators and multi-threading in:

http://www.dabeaz.com/generators/Generators.pdf