Reputation: 179
I'm having a problem with my multiprocessing and I'm afraid it's a rather simple fix and I'm just not properly implementing the multiprocessing correctly. I've been researching the things that can cause the problem, but all I'm really finding is people recommending the use of a queue to prevent this, but that doesn't seem to be stopping it (again, I may just be implementing the queue incorrectly) I've been at this a couple of days now and I was hoping I could get some help. Thanks in advance!
import csv
import multiprocessing as mp
import os
import queue
import sys
import time
import connections
import packages
import profiles
def execute_extract(package, profiles, q):
# This is the package execution for the extract
# It fires fine and will print the starting message below
started_at = time.monotonic()
print(f"Starting {package.packageName}")
try:
oracle_connection = connections.getOracleConnection(profiles['oracle'], 1)
engine = connections.getSQLConnection(profiles['system'], 1)
path = os.path.join(os.getcwd(), 'csv_data', package.packageName + '.csv')
cursor = oracle_connection.cursor()
if os.path.exists(path):
os.remove(path)
f = open(path, 'w')
chunksize = 100000
offset = 0
row_total = 0
csv_writer = csv.writer(f, delimiter='^', lineterminator='\n')
# I am having to do some data cleansing. I know this is not the most efficient way to do this, but currently
# it is what I am limited too
while True:
cursor.execute(package.query + f'\r\n OFFSET {offset} ROWS\r\n FETCH NEXT {chunksize} ROWS ONLY')
test = cursor.fetchone()
if test is None:
break
else:
while True:
row = cursor.fetchone()
if row is None:
break
else:
new_row = list(row)
new_row.append(package.sourceId[0])
new_row.append('')
i = 0
for item in new_row:
if type(item) == float:
new_row[i] = int(item)
elif type(item) == str:
new_row[i] = item.encode('ascii', 'replace')
i += 1
row = tuple(new_row)
csv_writer.writerow(row)
row_total += 1
offset += chunksize
f.close()
# I know that execution is at least reaching this point. I can watch the CSV files grow as more and more
# rows are added to the for all the packages What I never get are either the success message or error message
# below, and there are never any entries placed in the tables
query = f"BULK INSERT {profiles['system'].database.split('_')[0]}_{profiles['system'].database.split('_')[1]}_test_{profiles['system'].database.split('_')[2]}.{package.destTable} FROM \"{path}\" WITH (FIELDTERMINATOR='^', ROWTERMINATOR='\\n');"
engine.cursor().execute(query)
engine.commit()
end_time = time.monotonic() - started_at
print(
f"{package.packageName} has completed. Total rows inserted: {row_total}. Total execution time: {end_time} seconds\n")
os.remove(path)
except Exception as e:
print(f'An error has occured for package {package.packageName}.\r\n {repr(e)}')
finally:
# Here is where I am trying to add an item to the queue so the get method in the main def will pick it up and
# remove it from the queue
q.put(f'{package.packageName} has completed')
if oracle_connection:
oracle_connection.close()
if engine:
engine.cursor().close()
engine.close()
if __name__ == '__main__':
# Setting mp creation type
ctx = mp.get_context('spawn')
q = ctx.Queue()
# For the Etl I generate a list of class objects that hold relevant information profs contains a list of
# connection objects (credentials, connection strings, etc) packages contains the information to run the extract
# (destination tables, query string, package name for logging, etc)
profs = profiles.get_conn_vars(sys.argv[1])
packages = packages.get_etl_packages(profs)
processes = []
# I'm trying to track both individual package execution time and overall time so I can get an estimate on rows
# per second
start_time = time.monotonic()
sqlConn = connections.getSQLConnection(profs['system'])
# Here I'm executing a SQL command to truncate all my staging tables to ensure they are empty and will not
# generate any key violations
sqlConn.execute(
f"USE[{profs['system'].database.split('_')[0]}_{profs['system'].database.split('_')[1]}_test_{profs['system'].database.split('_')[2]}]\r\nExec Sp_msforeachtable @command1='Truncate Table ?',@whereand='and Schema_Id=Schema_id(''my_schema'')'")
# Here is where I start generating a process per package to try and get all packages to run simultaneously
for package in packages:
p = ctx.Process(target=execute_extract, args=(package, profs, q,))
processes.append(p)
p.start()
# Here is my attempt at managing the queue. This is a monstrosity of fixes I've tried to get this to work
results = []
while True:
try:
result = q.get(False, 0.01)
results.append(result)
except queue.Empty:
pass
allExited = True
for t in processes:
if t.exitcode is None:
allExited = False
break
if allExited & q.empty():
break
for p in processes:
p.join()
# Closing out the end time and writing the overall execution time in minutes.
end_time = time.monotonic() - start_time
print(f'Total execution time of {end_time / 60} minutes.')
Upvotes: 1
Views: 437
Reputation: 44213
I can't be sure why you are experiencing a deadlock (I am not at all convinced it is related to your queue management), but I can say for sure that you can simplify your queue management logic if you do one of either two things:
Method 1
Ensure that your worker function, execute_extract
will put something on the results queue even in the case of an exception (I would recommend placing the Exception
object itself). Then your entire main process loop that begins with while True:
that attempts to get the results can be replaced with:
results = [q.get() for _ in range(len(processes))]
You are guaranteed that there will be a fixed number of messages on the queue equal to the number of processes created.
Method 2 (even simpler)
Simply reverse the order in which you wait for the subprocesses to complete and you process the results queue. You don't know how many messages will be on the queue but you aren't processing the queue until all the processes have returned. So however many messages are on the queue is all you will ever get. Just retrieve them until the queue is empty:
for p in processes:
p.join()
results = []
while not q.empty():
results.append(q.get())
At this point I would normally suggest that you use a multiprocessing pool class such as multiprocessing.Pool
which does not require an explicit queue to retrieve results. But make either of these changes (I suggest Method 2, as I cannot see how it can cause a deadlock since only the main process is running at this point) and see if your problem goes away. I am not, however, guaranteeing that your issue is not somewhere else in your code. While your code is overly complicated and inefficient, it is not obviously "wrong." At least you will know whether your problem is elsewhere.
And my question for you: What does it buy you to do everything using a context acquired with ctx = mp.get_context('spawn')
instead of just calling the methods on the multiprocessing
module itself? If your platform had support for a fork
call, which would be the default context, would you not want to use that?
Upvotes: 1