Mediterráneo
Mediterráneo

Reputation: 115

How to use multiprocessing in a chronical order?

I have a python script which exists of 2 processes:

  1. process 1: Loading and unzipping files
  2. process 2: Processing the files, doing some stuff with it.

Before implementing multiprocessing the software seemed to do its work in chronological order. Loading all the zipped files, unzipping them, then opening them to do some stuff with it.

So I have brought multiprocessing into the game and right now it seems that whilst the files are being loaded and unzipped the process of opening and doing stuff with them has already started. So there are multiple processes doing stuff at the same time. The problem is that when I run this code on big data (more then 100+ files) I get problems with concurrent file access. This results in PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: When I run the snippet on a small data set (30 files approx.) it seems to go okay because the files are being unzipped really fast just in time process 2 starts.

What I want: I want to keep the multiprocessing because it is speeding up things but I want process 2 only to start if all the files have been unzipped (e.g. process 1 is done).

This is my snippet:

import os
import csv
import collections
import datetime 
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool

def generate_file_lists():
    # Change the following line to a real path
    data_files = 'c:\desktop\DataEnergy'
    pattern = '*.zip'
    last_root = None
    args = []
    for root, dirs, files in os.walk(data_files):
        for filename in fnmatch.filter(files, pattern):
            if root != last_root:
                last_root = root
                if args:
                    yield args
                    args = []
            args.append((root, filename))
    if args:
        yield args

def unzip(file_list):
    """
    file_list is a list of (root, filename) tuples where
    root is the same for all tuples.
    """
    # Change the following line to a real path:
    counter_part = 'c:\desktop\CounterPart'
    for root, filename in file_list:
        path = os.path.join(root, filename)
        date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
        date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()

        #Create the new directory location
        new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))

        #Join the directory names counter_part and create their paths.
        new = os.path.join(counter_part, new_dir)

        #Create the directories
        if (not os.path.exists(new)):
            os.makedirs(new)
        zipfile.ZipFile(path).extractall(new)

        #Get al the zipped files
        files = os.listdir(new)

        #Rename all the files in the created directories
        for file in files:
            filesplit = os.path.splitext(os.path.basename(file))
            if not re.search(r'_\d{8}.', file):
                os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))

# Required for Windows:
if __name__ == '__main__':
    pool = Pool(13)
    pool.map(unzip, generate_file_lists())
    print('the files have been unzipped!')


#Start proces 2 
all_missing_dates = ['20210701', '20210702']
missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]

dates_to_process = []
root = Path('.\middle_stage').resolve()


at_set = {'Audi', 'Mercedes', 'Volkswagen'}

#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date): 
    if set(row).intersection(at_set):       
        if len(r) > 24 and r[24].isdigit():
            aantal_pplgs = int(r[24])  
            date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
            condition_3 = date_time.date() == missing_date  if len(r) > 3 else True  
            
            return condition_3
    return False

#Open the files and read the rows
print("Start reading data")
data_per_date = dict()
for missing_date in missing_dates:
    print("\tReading missing date: ", missing_date)
    files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
    if len(files) != 13:
        continue
    dates_to_process.append(missing_date)  

    vehicle_loc_dict = collections.defaultdict(list)
    for file in files:      
        with open(file, 'r') as log_file:
            reader = csv.reader(log_file, delimiter = ',')
            next(reader) # skip header
            for row in reader:
                if filter_row(row, missing_date): 
                    print('filter_row has been executed!')

    data_per_date[missing_date] = vehicle_loc_dict 

Upvotes: 3

Views: 316

Answers (2)

Booboo
Booboo

Reputation: 44283

The problem just seems to be the following:

If you are running under Windows (and based on the names of your directories, that seems to be the case), whenever you create a new process (here you are creating 13 new processes by creating a multiprocessing pool), the spawn method of creating processes is used. This means that a new, empty address space is created into which the Python interpreter is re-launched and your source program is re-executed from the top to initialize the address space by executing all statements at global scope except any statements that are within blocks that begin with if __name__ == '__main__': because in these new processes this condition will be False. This is also why you place code that creates new processes in such a block, i.e. so that you do not get into an recursive loop crating new processes ad inifinitum.

That said, your so-called process 2 statements are at global scope not within a if __name__ == '__main__': block and consequently they are being executed 13 times in parallel in initializing the multiprocessing pool. But I can imagine a scenario where process 1 of the pool executes the code to no effect since nothing has been unzipped yet and then now that it has been initialized it starts to unzip a file. Meanwhile other processes in the pool start running its initialization code and there is now the conflict.

The solution is to move the process 2 code as follows:

import os
import csv
import collections
import datetime 
import zipfile
import re
import shutil
import fnmatch
from pathlib import Path
import ntpath
import configparser
from multiprocessing import Pool

def generate_file_lists():
    # Change the following line to a real path
    data_files = 'c:\desktop\DataEnergy'
    pattern = '*.zip'
    last_root = None
    args = []
    for root, dirs, files in os.walk(data_files):
        for filename in fnmatch.filter(files, pattern):
            if root != last_root:
                last_root = root
                if args:
                    yield args
                    args = []
            args.append((root, filename))
    if args:
        yield args

def unzip(file_list):
    """
    file_list is a list of (root, filename) tuples where
    root is the same for all tuples.
    """
    # Change the following line to a real path:
    counter_part = 'c:\desktop\CounterPart'
    for root, filename in file_list:
        path = os.path.join(root, filename)
        date_zipped_file_s = re.search('-(.\d+)-', filename).group(1)
        date_zipped_file = datetime.datetime.strptime(date_zipped_file_s, '%Y%m%d').date()

        #Create the new directory location
        new_dir = os.path.normpath(os.path.join(os.path.relpath(path, start='c:\desktop\DataEnergy'), ".."))

        #Join the directory names counter_part and create their paths.
        new = os.path.join(counter_part, new_dir)

        #Create the directories
        if (not os.path.exists(new)):
            os.makedirs(new)
        zipfile.ZipFile(path).extractall(new)

        #Get al the zipped files
        files = os.listdir(new)

        #Rename all the files in the created directories
        for file in files:
            filesplit = os.path.splitext(os.path.basename(file))
            if not re.search(r'_\d{8}.', file):
                os.rename(os.path.join(new, file), os.path.join(new, filesplit[0]+'_'+date_zipped_file_s+filesplit[1]))

    return False

def process1():
    pool = Pool(13)
    pool.map(unzip, generate_file_lists())
    print('the files have been unzipped!')

#Only read the rows if they fulfill the following conditions.
def filter_row(r, missing_date): 
    if set(row).intersection(at_set):       
        if len(r) > 24 and r[24].isdigit():
            aantal_pplgs = int(r[24])  
            date_time = datetime.datetime.fromisoformat(r[0]) if len(r) > 3 else True
            condition_3 = date_time.date() == missing_date  if len(r) > 3 else True  
            
            return condition_3

def process2():   
    #Start proces 2 
    all_missing_dates = ['20210701', '20210702']
    missing_dates = [datetime.datetime.strptime(i, "%Y%m%d").date() for i in all_missing_dates]
    
    dates_to_process = []
    root = Path('.\middle_stage').resolve()   
    
    at_set = {'Audi', 'Mercedes', 'Volkswagen'}
    
    #Open the files and read the rows
    print("Start reading data")
    data_per_date = dict()
    for missing_date in missing_dates:
        print("\tReading missing date: ", missing_date)
        files=[fn for fn in (e for e in root.glob(f"**/*_{missing_date:%Y%m%d}.txt") if e.is_file())]
        if len(files) != 13:
            continue
        dates_to_process.append(missing_date)  
    
        vehicle_loc_dict = collections.defaultdict(list)
        for file in files:      
            with open(file, 'r') as log_file:
                reader = csv.reader(log_file, delimiter = ',')
                next(reader) # skip header
                for row in reader:
                    if filter_row(row, missing_date): 
                        print('filter_row has been executed!')
    
        data_per_date[missing_date] = vehicle_loc_dict
        
def main():
    process1()
    process2()

if __name__ == '__main__':
    main() 

Upvotes: 1

Sean Powell
Sean Powell

Reputation: 573

Main Thread

In the main thread we need to setup the queues and add the zipped files to the queues

import threading
import queue 

zippedQueue = queue.Queue()
unzippedQueue = queue.Queue()
zippedLock = threading.Lock()

for file in files:
   zippedQueue.put(file)

Worker Thread

class ChiSquaredThread(threading.Thread):
    def __init__(self):
         threading.Thread.__init__(self)

    def run(self):
        unzipFile()

# add all your zipped files to the zippedQueue 
def unzipFile():
   zippedLock.acquire()
   if not zippedQueue.empty():
      zipped = zippedQueue.get()
      zippedLock.release()
      # unzip the zipped file here
      # add unziped file to queue
      unzipedQueue.put(unzippedFile)

then create a similar block as the worker thread block that does the same steps and processes the file. This sample block in the worker thread should guide you on that

Upvotes: 1

Related Questions