Reputation: 953
I am trying to learn how to use multiprocessing
and I am having a problem.
I am trying to run this code:
import multiprocessing as mp
import random
import string
random.seed(123)
# Define an output queue
output = mp.Queue()
# define a example function
def rand_string(length, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put(rand_str)
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
results = [output.get() for p in processes]
print(results)
From here
The code in itself runs properly, but when I replace rand_string
with my function (reads a bunch of csv files in Pandas dataframes) the code never ends.
The function is this:
def readMyCSV(clFile):
aClTable = pd.read_csv(clFile)
# I do some processing here, but at the end the
# function returns a Pandas DataFrame
return(aClTable)
Then I wrap the function so that it allows for a Queue
in the arguments:
def readMyCSVParWrap(clFile, outputq):
outputq.put(readMyCSV(clFile))
and I build the processes with:
processes = [mp.Process(target=readMyCSVParWrap, args=(singleFile,output)) for singleFile in allFiles[:5]]
If I do so, the code never stops running, and results are never printed.
IF I put only the clFile string in the output queue, e.g.:
outputq.put((clFile))
the results are printed properly (just a list of clFiles)
When I look at htop
, I see 5 processes being spawn, but they do not use any CPU.
Lastly, the readMyCSV
function works properly if I run it by itself (returns a Pandas
DataFrame)
Is there anything I am doing wrong? I am running this in a Jupyter notebook, maybe that is an issue?
Upvotes: 0
Views: 503
Reputation: 21684
Seems your join
-statements on the processes are causing a deadlock. The processes can't terminate because they wait till the items on the queue are consumed, but in your code this happens only after the joining.
Joining processes that use queues
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically. docs
The docs further suggest to swap the lines with queue.get
and join
or just removing join
.
Also important:
Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process)...protect the “entry point” of the program by using if name == 'main':. ibid
Upvotes: 1