Multiple threads writing to the same CSV in Python
I am not sure if csvwriter
is thread-safe. The documentation doesn't specify, so to be safe, if multiple threads use the same object, you should protect the usage with a threading.Lock
:
# create the lockimport threadingcsv_writer_lock = threading.Lock()def downloadThread(arguments......): # pass csv_writer_lock somehow # Note: use csv_writer_lock on *any* access # Some code..... with csv_writer_lock: writer.writerow(re.split(',', line.decode()))
That being said, it may indeed be more elegant for the downloadThread
to submit write tasks to an executor, instead of explicitly using locks like this.
Way-late-to-the-party note: You could handle this a different way with no locking by having a single writer consuming from a shared Queue, with rows being pushed to the Queue by the threads doing the processing.
from threading import Threadfrom queue import Queuefrom concurrent.futures import ThreadPoolExecutor# CSV writer setup goes herequeue = Queue()def consume(): while True: if not queue.empty(): i = queue.get() # Row comes out of queue; CSV writing goes here print(i) if i == 4999: returnconsumer = Thread(target=consume)consumer.setDaemon(True)consumer.start()def produce(i): # Data processing goes here; row goes into queue queue.put(i)with ThreadPoolExecutor(max_workers=10) as executor: for i in range(5000): executor.submit(produce, i)consumer.join()
here is some code, it also handles the headache-causing unicode issue:
def ensure_bytes(s): return s.encode('utf-8') if isinstance(s, unicode) else sclass ThreadSafeWriter(object):'''>>> from StringIO import StringIO>>> f = StringIO()>>> wtr = ThreadSafeWriter(f)>>> wtr.writerow(['a', 'b'])>>> f.getvalue() == "a,b\\r\\n"True''' def __init__(self, *args, **kwargs): self._writer = csv.writer(*args, **kwargs) self._lock = threading.Lock() def _encode(self, row): return [ensure_bytes(cell) for cell in row] def writerow(self, row): row = self._encode(row) with self._lock: return self._writer.writerow(row) def writerows(self, rows): rows = (self._encode(row) for row in rows) with self._lock: return self._writer.writerows(rows)# example:with open('some.csv', 'w') as f: writer = ThreadSafeWriter(f) writer.write([u'中文', 'bar'])
a more detailed solution is here