John Doe
John Doe

Reputation: 10203

Writing to same file(s) with multiprocessing (avoid lock)

I'm running a script on multiple csv files using multiprocessing.
If a line matches the regex, it writes the line to (a) new file(s) (new file name equals match).
I've noticed a problem writing to the same file(s) from different processes (file lock). How can i fix this ?

My code:

import re
import glob
import os
import multiprocessing

pattern ='abc|def|ghi|jkl|mno'
regex = re.compile(pattern, re.IGNORECASE)

def process_files (file):
    res_path = r'd:\results'
    with open(file, 'r+', buffering=1) as ifile:
        for line in ifile:
            matches = set(regex.findall(line))
            for match in matches:
                res_file = os.path.join(res_path, match + '.csv') 
                with open(res_file, 'a') as rf:
                    rf.write(line)

def main():

    p = multiprocessing.Pool()
    for file in glob.iglob(r'D:\csv_files\**\*.csv', recursive=True):
        p.apply_async(process, [file]) 

    p.close()
    p.join()

if __name__ == '__main__':
    main()

Thanks in advance!

Upvotes: 1

Views: 667

Answers (1)

bruno desthuilliers
bruno desthuilliers

Reputation: 77912

Make the filename unique for each subprocess:

def process_files (file, id):
    res_path = r'd:\results'
    for line in file:
        matches = set(regex.findall(line))
        for match in matches:
            filename = "{}_{}.csv".format(match, id)
            res_file = os.path.join(res_path, filename) 
            with open(res_file, 'a') as rf:
                rf.write(line)

def main():

    p = multiprocessing.Pool()
    for id, file in enumerate(glob.iglob(r'D:\csv_files\**\*.csv', recursive=True)):
        p.apply_async(process, [file, id]) 

then you will have to add some code to consolidate the different "_.csv" files in single ".csv" files.

Concurrent writes on a same file is something you want to avoid - either you don't have file locks and you end up with corrupted data, or you have file locks and then it will slow down the process, which defeats the whole point of parallelizing it.

Upvotes: 4

Related Questions