Josiah Hulsey
Josiah Hulsey

Reputation: 499

Multiprocessing spawns idle processes and doesn't compute anything

There seems to be a litany of questions and answers on overflow about the multiprocessing library. I have looked through all the relevant ones I can find all and have not found one that directly speaks to my problem.

I am trying to apply the same function to multiple files in parallel. Whenever I start the processing though, the computer just spins up several instances of python and then does nothing. No computations happen at all and the processes just sit idle

I have looked at all of the similar questions on overflow, and none seem to have my problem of idle processes.

what am i doing wrong?

define the function (abbreviated for example. checked to make sure it works)

import pandas as pd
import numpy as np
import glob
import os
#from timeit import default_timer as timer
import talib
from multiprocessing import Process


def example_function(file):

    df=pd.read_csv(file, header = 1)
    stock_name = os.path.basename(file)[:-4]
    macd, macdsignal, macdhist = talib.MACD(df.Close, fastperiod=12, slowperiod=26, signalperiod=9)

    df['macd'] = macdhist*1000
    print(f'stock{stock_name} processed')
    final_macd_report.append(df)

getting a list of all the files in the directory i want to run the function on

import glob

path = r'C:\Users\josiahh\Desktop\big_test3/*'

files = [f for f in glob.glob(path, recursive=True)]

attempting multiprocessing

import multiprocessing as mp
if __name__ == '__main__':

    p = mp.Pool(processes = 5)
    async_result = p.map_async(example_function, files)
    p.close()
    p.join()
    print("Complete")

any help would be greatly appreciated.

Upvotes: 0

Views: 695

Answers (2)

Roland Smith
Roland Smith

Reputation: 43533

While I don't see anything wrong per se, I would like to suggest some changes.

In general, worker functions in a Pool are expected to return something. This return value is transferred back to the parent process. I like to use that as a status report. It is also a good idea to catch exceptions in the worker process, just in case. For example:

def example_function(file):
    status = 'OK'
    try:
        df=pd.read_csv(file, header = 1)
        stock_name = os.path.basename(file)[:-4]
        macd, macdsignal, macdhist = talib.MACD(df.Close, fastperiod=12, slowperiod=26, signalperiod=9)
        df['macd'] = macdhist*1000
        final_macd_report.append(df)
   except:
       status = 'exception caught!'
   return {'filename': file, 'result': status}

(This is just a quick example. You might want to e.g. report the full exception traceback to help with debugging.)

If workers run for a long time, I like to get feedback ASAP. So I prefer to use imap_unordered, especially if some tasks can take much longer than others. This returns an iterator that yields results in the order that jobs finish.

if __name__ == '__main__':

    with mp.Pool() as p:
        for res in p.imap_unordered(example_function, files):
            print(res)

This way you get unambiguous proof that a worker finished, and what the result was and if any problems occurred.

This is preferable over just calling print from the workers. With stdout buffering and multiple workers inheriting the same output stream there is no saying when you actually see something.

Edit: As you can see here, multiprocessing.Pool does not work well with interactive interpreters, especially on ms-windows. Basically, ms-windows lacks the fork system call that lets UNIX-like systems duplicate a process. So on ms-windows, multiprocessing has to do a try and mimic fork which means importing the original program file in the child processes. That doesn't work well with interactive interpreters like IPython. One would probably have to dig deep into the internals of Jupyter and multiprocessing to find out the exact cause of the problem.

It seems that a workaround for this problem is to define the worker function in a separate module and import that in your code in IPython.

It is actually mentioned in the documentation that multiprocessing.Pool doesn't work well with interactive interpreters. See the note at the end of this section.

Upvotes: 1

Tim Peters
Tim Peters

Reputation: 70735

There's nothing wrong with the structure of the code, so something is going wrong that can't be guessed from what you posted. Start with something very much simpler, then move it in stages to what you're actually trying to do. You're importing mountains of extension (3rd party) code, and the problem could be anywhere. Here's a start:

def example_function(arg):
    from time import sleep
    msg = "crunching " + str(arg)
    print(msg)
    sleep(arg)
    print("done " + msg)

if __name__ == '__main__':
    import multiprocessing as mp
    p = mp.Pool(processes = 5)
    async_result = p.map_async(example_function, reversed(range(15)))
    print("result", async_result.get())
    p.close()
    p.join()
    print("Complete")

That works fine on Win10 under 64-bit Python 3.7.4 for me. Does it for you?

Note especially the async_result.get() at the end. That displays a list with 15 None values. You never do anything with your async_result. Because of that, if any exception was raised in a worker process, it will most likely silently vanish. In such cases .get()'ing the result will (re)raise the exception in your main program.

Also please verify that your files list isn't in fact empty. We can't guess at that from here either ;-)

EDIT

I moved the async_result.get() into its own line, right after the map_async(), to maximize the chance of revealing otherwise silent exception in the worker processes. At least add that much to your code too.

Upvotes: 2

Related Questions