Multiple threads writing to the same CSV in Python Multiple threads writing to the same CSV in Python multithreading multithreading

Multiple threads writing to the same CSV in Python


I am not sure if csvwriter is thread-safe. The documentation doesn't specify, so to be safe, if multiple threads use the same object, you should protect the usage with a threading.Lock:

# create the lockimport threadingcsv_writer_lock = threading.Lock()def downloadThread(arguments......):    # pass csv_writer_lock somehow    # Note: use csv_writer_lock on *any* access    # Some code.....    with csv_writer_lock:        writer.writerow(re.split(',', line.decode()))

That being said, it may indeed be more elegant for the downloadThread to submit write tasks to an executor, instead of explicitly using locks like this.


Way-late-to-the-party note: You could handle this a different way with no locking by having a single writer consuming from a shared Queue, with rows being pushed to the Queue by the threads doing the processing.

from threading import Threadfrom queue import Queuefrom concurrent.futures import ThreadPoolExecutor# CSV writer setup goes herequeue = Queue()def consume():    while True:        if not queue.empty():            i = queue.get()                        # Row comes out of queue; CSV writing goes here                        print(i)            if i == 4999:                returnconsumer = Thread(target=consume)consumer.setDaemon(True)consumer.start()def produce(i):    # Data processing goes here; row goes into queue    queue.put(i)with ThreadPoolExecutor(max_workers=10) as executor:    for i in range(5000):        executor.submit(produce, i)consumer.join()


here is some code, it also handles the headache-causing unicode issue:

def ensure_bytes(s):    return s.encode('utf-8') if isinstance(s, unicode) else sclass ThreadSafeWriter(object):'''>>> from StringIO import StringIO>>> f = StringIO()>>> wtr = ThreadSafeWriter(f)>>> wtr.writerow(['a', 'b'])>>> f.getvalue() == "a,b\\r\\n"True'''    def __init__(self, *args, **kwargs):        self._writer = csv.writer(*args, **kwargs)        self._lock = threading.Lock()    def _encode(self, row):        return [ensure_bytes(cell) for cell in row]    def writerow(self, row):        row = self._encode(row)        with self._lock:            return self._writer.writerow(row)    def writerows(self, rows):        rows = (self._encode(row) for row in rows)        with self._lock:            return self._writer.writerows(rows)# example:with open('some.csv', 'w') as f:    writer = ThreadSafeWriter(f)    writer.write([u'中文', 'bar'])

a more detailed solution is here