JasonWH
JasonWH

Reputation: 179

Problem with Multiprocessing and Deadlocking in Python3

I'm having a problem with my multiprocessing and I'm afraid it's a rather simple fix and I'm just not properly implementing the multiprocessing correctly. I've been researching the things that can cause the problem, but all I'm really finding is people recommending the use of a queue to prevent this, but that doesn't seem to be stopping it (again, I may just be implementing the queue incorrectly) I've been at this a couple of days now and I was hoping I could get some help. Thanks in advance!

import csv
import multiprocessing as mp
import os
import queue
import sys
import time

import connections
import packages
import profiles


def execute_extract(package, profiles, q):
    # This is the package execution for the extract
    # It fires fine and will print the starting message below
    started_at = time.monotonic()
    print(f"Starting {package.packageName}")
    try:
        oracle_connection = connections.getOracleConnection(profiles['oracle'], 1)
        engine = connections.getSQLConnection(profiles['system'], 1)
        path = os.path.join(os.getcwd(), 'csv_data', package.packageName + '.csv')
        cursor = oracle_connection.cursor()

        if os.path.exists(path):
            os.remove(path)

        f = open(path, 'w')
        chunksize = 100000
        offset = 0
        row_total = 0

        csv_writer = csv.writer(f, delimiter='^', lineterminator='\n')
        # I am having to do some data cleansing.  I know this is not the most efficient way to do this, but currently
        # it is what I am limited too 
        while True:
            cursor.execute(package.query + f'\r\n OFFSET {offset} ROWS\r\n FETCH NEXT {chunksize} ROWS ONLY')
            test = cursor.fetchone()
            if test is None:
                break
            else:
                while True:
                    row = cursor.fetchone()
                    if row is None:
                        break
                    else:
                        new_row = list(row)
                        new_row.append(package.sourceId[0])
                        new_row.append('')
                        i = 0
                        for item in new_row:
                            if type(item) == float:
                                new_row[i] = int(item)
                            elif type(item) == str:
                                new_row[i] = item.encode('ascii', 'replace')
                            i += 1
                        row = tuple(new_row)
                        csv_writer.writerow(row)
                        row_total += 1

            offset += chunksize

        f.close()
        # I know that execution is at least reaching this point.  I can watch the CSV files grow as more and more 
        # rows are added to the for all the packages What I never get are either the success message or error message
        # below, and there are never any entries placed in the tables 
        query = f"BULK INSERT {profiles['system'].database.split('_')[0]}_{profiles['system'].database.split('_')[1]}_test_{profiles['system'].database.split('_')[2]}.{package.destTable} FROM \"{path}\" WITH (FIELDTERMINATOR='^', ROWTERMINATOR='\\n');"
        engine.cursor().execute(query)
        engine.commit()

        end_time = time.monotonic() - started_at
        print(
            f"{package.packageName} has completed.  Total rows inserted: {row_total}.  Total execution time: {end_time} seconds\n")
        os.remove(path)
    except Exception as e:

        print(f'An error has occured for package {package.packageName}.\r\n {repr(e)}')

    finally:
        # Here is where I am trying to add an item to the queue so the get method in the main def will pick it up and
        # remove it from the queue 
        q.put(f'{package.packageName} has completed')
        if oracle_connection:
            oracle_connection.close()
        if engine:
            engine.cursor().close()
            engine.close()


if __name__ == '__main__':
    # Setting mp creation type
    ctx = mp.get_context('spawn')
    q = ctx.Queue()

    # For the Etl I generate a list of class objects that hold relevant information profs contains a list of 
    # connection objects (credentials, connection strings, etc) packages contains the information to run the extract 
    # (destination tables, query string, package name for logging, etc) 
    profs = profiles.get_conn_vars(sys.argv[1])
    packages = packages.get_etl_packages(profs)

    processes = []
    # I'm trying to track both individual package execution time and overall time so I can get an estimate on rows 
    # per second 
    start_time = time.monotonic()

    sqlConn = connections.getSQLConnection(profs['system'])
    # Here I'm executing a SQL command to truncate all my staging tables to ensure they are empty and will not 
    # generate any key violations 
    sqlConn.execute(
        f"USE[{profs['system'].database.split('_')[0]}_{profs['system'].database.split('_')[1]}_test_{profs['system'].database.split('_')[2]}]\r\nExec Sp_msforeachtable @command1='Truncate Table ?',@whereand='and Schema_Id=Schema_id(''my_schema'')'")

    # Here is where I start generating a process per package to try and get all packages to run simultaneously
    for package in packages:
        p = ctx.Process(target=execute_extract, args=(package, profs, q,))
        processes.append(p)
        p.start()

    # Here is my attempt at managing the queue.  This is a monstrosity of fixes I've tried to get this to work
    results = []
    while True:
        try:
            result = q.get(False, 0.01)
            results.append(result)
        except queue.Empty:
            pass
        allExited = True
        for t in processes:
            if t.exitcode is None:
                allExited = False
                break
        if allExited & q.empty():
            break

    for p in processes:
        p.join()

    # Closing out the end time and writing the overall execution time in minutes.
    end_time = time.monotonic() - start_time
    print(f'Total execution time of {end_time / 60} minutes.')

Upvotes: 1

Views: 437

Answers (1)

Booboo
Booboo

Reputation: 44213

I can't be sure why you are experiencing a deadlock (I am not at all convinced it is related to your queue management), but I can say for sure that you can simplify your queue management logic if you do one of either two things:

Method 1

Ensure that your worker function, execute_extract will put something on the results queue even in the case of an exception (I would recommend placing the Exception object itself). Then your entire main process loop that begins with while True: that attempts to get the results can be replaced with:

results = [q.get() for _ in range(len(processes))]

You are guaranteed that there will be a fixed number of messages on the queue equal to the number of processes created.

Method 2 (even simpler)

Simply reverse the order in which you wait for the subprocesses to complete and you process the results queue. You don't know how many messages will be on the queue but you aren't processing the queue until all the processes have returned. So however many messages are on the queue is all you will ever get. Just retrieve them until the queue is empty:

for p in processes:
    p.join()
results = []
while not q.empty():
    results.append(q.get())

At this point I would normally suggest that you use a multiprocessing pool class such as multiprocessing.Pool which does not require an explicit queue to retrieve results. But make either of these changes (I suggest Method 2, as I cannot see how it can cause a deadlock since only the main process is running at this point) and see if your problem goes away. I am not, however, guaranteeing that your issue is not somewhere else in your code. While your code is overly complicated and inefficient, it is not obviously "wrong." At least you will know whether your problem is elsewhere.

And my question for you: What does it buy you to do everything using a context acquired with ctx = mp.get_context('spawn') instead of just calling the methods on the multiprocessing module itself? If your platform had support for a fork call, which would be the default context, would you not want to use that?

Upvotes: 1

Related Questions