mic
mic

Reputation: 953

python multiprocessing stuck (maybe reading csv)

I am trying to learn how to use multiprocessing and I am having a problem.

I am trying to run this code:

import multiprocessing as mp
import random
import string

random.seed(123)

# Define an output queue
output = mp.Queue()

# define a example function
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put(rand_str)

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)

From here

The code in itself runs properly, but when I replace rand_string with my function (reads a bunch of csv files in Pandas dataframes) the code never ends.

The function is this:

def readMyCSV(clFile):

    aClTable = pd.read_csv(clFile)

    # I do some processing here, but at the end the 
    # function returns a Pandas DataFrame

    return(aClTable)

Then I wrap the function so that it allows for a Queue in the arguments:

def readMyCSVParWrap(clFile, outputq):
    outputq.put(readMyCSV(clFile))

and I build the processes with:

processes = [mp.Process(target=readMyCSVParWrap, args=(singleFile,output)) for singleFile in allFiles[:5]]

If I do so, the code never stops running, and results are never printed.

IF I put only the clFile string in the output queue, e.g.:

outputq.put((clFile))

the results are printed properly (just a list of clFiles)

When I look at htop, I see 5 processes being spawn, but they do not use any CPU.

Lastly, the readMyCSV function works properly if I run it by itself (returns a Pandas DataFrame)

Is there anything I am doing wrong? I am running this in a Jupyter notebook, maybe that is an issue?

Upvotes: 0

Views: 503

Answers (1)

Darkonaut
Darkonaut

Reputation: 21684

Seems your join-statements on the processes are causing a deadlock. The processes can't terminate because they wait till the items on the queue are consumed, but in your code this happens only after the joining.

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically. docs

The docs further suggest to swap the lines with queue.get and join or just removing join.

Also important:

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process)...protect the “entry point” of the program by using if name == 'main':. ibid

Upvotes: 1

Related Questions