Reputation: 10203
I'm running a script on multiple csv files using multiprocessing.
If a line matches the regex, it writes the line to (a) new file(s) (new file name equals match).
I've noticed a problem writing to the same file(s) from different processes (file lock). How can i fix this ?
My code:
import re
import glob
import os
import multiprocessing
pattern ='abc|def|ghi|jkl|mno'
regex = re.compile(pattern, re.IGNORECASE)
def process_files (file):
res_path = r'd:\results'
with open(file, 'r+', buffering=1) as ifile:
for line in ifile:
matches = set(regex.findall(line))
for match in matches:
res_file = os.path.join(res_path, match + '.csv')
with open(res_file, 'a') as rf:
rf.write(line)
def main():
p = multiprocessing.Pool()
for file in glob.iglob(r'D:\csv_files\**\*.csv', recursive=True):
p.apply_async(process, [file])
p.close()
p.join()
if __name__ == '__main__':
main()
Thanks in advance!
Upvotes: 1
Views: 667
Reputation: 77912
Make the filename unique for each subprocess:
def process_files (file, id):
res_path = r'd:\results'
for line in file:
matches = set(regex.findall(line))
for match in matches:
filename = "{}_{}.csv".format(match, id)
res_file = os.path.join(res_path, filename)
with open(res_file, 'a') as rf:
rf.write(line)
def main():
p = multiprocessing.Pool()
for id, file in enumerate(glob.iglob(r'D:\csv_files\**\*.csv', recursive=True)):
p.apply_async(process, [file, id])
then you will have to add some code to consolidate the different "_.csv" files in single ".csv" files.
Concurrent writes on a same file is something you want to avoid - either you don't have file locks and you end up with corrupted data, or you have file locks and then it will slow down the process, which defeats the whole point of parallelizing it.
Upvotes: 4