GreenGodot
GreenGodot

Reputation: 6753

Multiple threads writing to the same CSV in Python

I'm new to multi-threading in Python and am currently writing a script that appends to a csv file. If I was to have multiple threads submitted to an concurrent.futures.ThreadPoolExecutor that appends lines to a csv file. What could I do to guarantee thread safety if appending was the only file-related operation being done by these threads?

Simplified version of my code:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for count,ad_id in enumerate(advertisers):

        downloadFutures.append(executor.submit(downloadThread, arguments.....))
        time.sleep(random.randint(1,3)) 

And my thread class being:

def downloadThread(arguments......):

                #Some code.....

                writer.writerow(re.split(',', line.decode()))

Should I set up a seperate single-threaded executor to handle writing or is it woth worrying about if I am just appending?

EDIT: I should elaborate that when the write operations occur can vary greatly with minutes between when the file is next appended to, I am just concerned that this scenario has not occurred when testing my script and I would prefer to be covered for that.

Upvotes: 17

Views: 22289

Answers (3)

kungphu
kungphu

Reputation: 4849

Way-late-to-the-party note: You could handle this a different way with no locking by having a single writer consuming from a shared Queue, with rows being pushed to the Queue by the threads doing the processing.

from threading import Thread
from queue import Queue
from concurrent.futures import ThreadPoolExecutor


# CSV writer setup goes here

queue = Queue()


def consume():
    while True:
        if not queue.empty():
            i = queue.get()
            
            # Row comes out of queue; CSV writing goes here
            
            print(i)
            if i == 4999:
                return


consumer = Thread(target=consume)
consumer.setDaemon(True)
consumer.start()


def produce(i):
    # Data processing goes here; row goes into queue
    queue.put(i)


with ThreadPoolExecutor(max_workers=10) as executor:
    for i in range(5000):
        executor.submit(produce, i)

consumer.join()

Upvotes: 20

ospider
ospider

Reputation: 10401

here is some code, it also handles the headache-causing unicode issue:

def ensure_bytes(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s

class ThreadSafeWriter(object):
'''
>>> from StringIO import StringIO
>>> f = StringIO()
>>> wtr = ThreadSafeWriter(f)
>>> wtr.writerow(['a', 'b'])
>>> f.getvalue() == "a,b\\r\\n"
True
'''

    def __init__(self, *args, **kwargs):
        self._writer = csv.writer(*args, **kwargs)
        self._lock = threading.Lock()

    def _encode(self, row):
        return [ensure_bytes(cell) for cell in row]

    def writerow(self, row):
        row = self._encode(row)
        with self._lock:
            return self._writer.writerow(row)

    def writerows(self, rows):
        rows = (self._encode(row) for row in rows)
        with self._lock:
            return self._writer.writerows(rows)

# example:
with open('some.csv', 'w') as f:
    writer = ThreadSafeWriter(f)
    writer.write([u'中文', 'bar'])

a more detailed solution is here

Upvotes: 7

Claudiu
Claudiu

Reputation: 229301

I am not sure if csvwriter is thread-safe. The documentation doesn't specify, so to be safe, if multiple threads use the same object, you should protect the usage with a threading.Lock:

# create the lock
import threading
csv_writer_lock = threading.Lock()

def downloadThread(arguments......):
    # pass csv_writer_lock somehow
    # Note: use csv_writer_lock on *any* access
    # Some code.....
    with csv_writer_lock:
        writer.writerow(re.split(',', line.decode()))

That being said, it may indeed be more elegant for the downloadThread to submit write tasks to an executor, instead of explicitly using locks like this.

Upvotes: 21

Related Questions