Reputation: 115
I have a python script which exists of 2 processes:
Before implementing multiprocessing the software seemed to do its work in chronological order. Loading all the zipped files, unzipping them, then opening them to do some stuff with it.
So I have brought multiprocessing into the game and right now it seems that whilst the files are being loaded and unzipped the process of opening and doing stuff with them has already started. So there are multiple processes doing stuff at the same time. The problem is that when I run this code on big data (more then 100+ files) I get problems with concurrent file access. This results in PermissionError: [WinError 32] The process cannot access the file because it is being used by another process:
When I run the snippet on a small data set (30 files approx.) it seems to go okay because the files are being unzipped really fast just in time process 2 starts.
What I want: I want to keep the multiprocessing because it is speeding up things but I want process 2 only to start if all the files have been unzipped (e.g. process 1 is done).
This is my snippet:
import os
import csv
import collections
import datetime
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool
def generate_file_lists():
# Change the following line to a real path
data_files = 'c:\desktop\DataEnergy'
pattern = '*.zip'
last_root = None
args = []
for root, dirs, files in os.walk(data_files):
for filename in fnmatch.filter(files, pattern):
if root != last_root:
last_root = root
if args:
yield args
args = []
args.append((root, filename))
if args:
yield args
def unzip(file_list):
"""
file_list is a list of (root, filename) tuples where
root is the same for all tuples.
"""
# Change the following line to a real path:
counter_part = 'c:\desktop\CounterPart'
for root, filename in file_list:
path = os.path.join(root, filename)
date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()
#Create the new directory location
new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))
#Join the directory names counter_part and create their paths.
new = os.path.join(counter_part, new_dir)
#Create the directories
if (not os.path.exists(new)):
os.makedirs(new)
zipfile.ZipFile(path).extractall(new)
#Get al the zipped files
files = os.listdir(new)
#Rename all the files in the created directories
for file in files:
filesplit = os.path.splitext(os.path.basename(file))
if not re.search(r'_\d{8}.', file):
os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))
# Required for Windows:
if __name__ == '__main__':
pool = Pool(13)
pool.map(unzip, generate_file_lists())
print('the files have been unzipped!')
#Start proces 2
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
dates_to_process = []
root = Path('.\middle_stage').resolve()
at_set = {'Audi', 'Mercedes', 'Volkswagen'}
#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date):
if set(row).intersection(at_set):
if len(r) > 24 and r[24].isdigit():
aantal_pplgs = int(r[24])
date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
condition_3 = date_time.date() == missing_date if len(r) > 3 else True
return condition_3
return False
#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
print("\tReading missing date: ", missing_date)
files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
if len(files) != 13:
continue
dates_to_process.append(missing_date)
vehicle_loc_dict = collections.defaultdict(list)
for file in files:
with open(file, 'r') as log_file:
reader = csv.reader(log_file, delimiter = ',')
next(reader) # skip header
for row in reader:
if filter_row(row, missing_date):
print('filter_row has been executed!')
data_per_date[missing_date] = vehicle_loc_dict
Upvotes: 3
Views: 316
Reputation: 44283
The problem just seems to be the following:
If you are running under Windows (and based on the names of your directories, that seems to be the case), whenever you create a new process (here you are creating 13 new processes by creating a multiprocessing pool), the spawn method of creating processes is used. This means that a new, empty address space is created into which the Python interpreter is re-launched and your source program is re-executed from the top to initialize the address space by executing all statements at global scope except any statements that are within blocks that begin with if __name__ == '__main__':
because in these new processes this condition will be False
. This is also why you place code that creates new processes in such a block, i.e. so that you do not get into an recursive loop crating new processes ad inifinitum.
That said, your so-called process 2 statements are at global scope not within a if __name__ == '__main__':
block and consequently they are being executed 13 times in parallel in initializing the multiprocessing pool. But I can imagine a scenario where process 1 of the pool executes the code to no effect since nothing has been unzipped yet and then now that it has been initialized it starts to unzip a file. Meanwhile other processes in the pool start running its initialization code and there is now the conflict.
The solution is to move the process 2 code as follows:
import os
import csv
import collections
import datetime
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool
def generate_file_lists():
# Change the following line to a real path
data_files = 'c:\desktop\DataEnergy'
pattern = '*.zip'
last_root = None
args = []
for root, dirs, files in os.walk(data_files):
for filename in fnmatch.filter(files, pattern):
if root != last_root:
last_root = root
if args:
yield args
args = []
args.append((root, filename))
if args:
yield args
def unzip(file_list):
"""
file_list is a list of (root, filename) tuples where
root is the same for all tuples.
"""
# Change the following line to a real path:
counter_part = 'c:\desktop\CounterPart'
for root, filename in file_list:
path = os.path.join(root, filename)
date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()
#Create the new directory location
new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))
#Join the directory names counter_part and create their paths.
new = os.path.join(counter_part, new_dir)
#Create the directories
if (not os.path.exists(new)):
os.makedirs(new)
zipfile.ZipFile(path).extractall(new)
#Get al the zipped files
files = os.listdir(new)
#Rename all the files in the created directories
for file in files:
filesplit = os.path.splitext(os.path.basename(file))
if not re.search(r'_\d{8}.', file):
os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))
return False
def process1():
pool = Pool(13)
pool.map(unzip, generate_file_lists())
print('the files have been unzipped!')
#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date):
if set(row).intersection(at_set):
if len(r) > 24 and r[24].isdigit():
aantal_pplgs = int(r[24])
date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
condition_3 = date_time.date() == missing_date if len(r) > 3 else True
return condition_3
def process2():
#Start proces 2
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
dates_to_process = []
root = Path('.\middle_stage').resolve()
at_set = {'Audi', 'Mercedes', 'Volkswagen'}
#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
print("\tReading missing date: ", missing_date)
files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
if len(files) != 13:
continue
dates_to_process.append(missing_date)
vehicle_loc_dict = collections.defaultdict(list)
for file in files:
with open(file, 'r') as log_file:
reader = csv.reader(log_file, delimiter = ',')
next(reader) # skip header
for row in reader:
if filter_row(row, missing_date):
print('filter_row has been executed!')
data_per_date[missing_date] = vehicle_loc_dict
def main():
process1()
process2()
if __name__ == '__main__':
main()
Upvotes: 1
Reputation: 573
In the main thread we need to setup the queues and add the zipped files to the queues
import threading
import queue
zippedQueue = queue.Queue()
unzippedQueue = queue.Queue()
zippedLock = threading.Lock()
for file in files:
zippedQueue.put(file)
class ChiSquaredThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
unzipFile()
# add all your zipped files to the zippedQueue
def unzipFile():
zippedLock.acquire()
if not zippedQueue.empty():
zipped = zippedQueue.get()
zippedLock.release()
# unzip the zipped file here
# add unziped file to queue
unzipedQueue.put(unzippedFile)
then create a similar block as the worker thread block that does the same steps and processes the file. This sample block in the worker thread should guide you on that
Upvotes: 1