donmelchior
donmelchior

Reputation: 1085

How to pass a dict argument to Python Pool.apply_async method?

I need to pass a dict to Python Pool.apply_async. It doesn't seem to work as expected because my script get stuck. How to pass a dict argument to Python Pool.apply_async method?

import multiprocessing as mp

def my_func(session, index):
    result = { "server": session['server'], "exit_code": session['exit_code'],"index": index }  
    return result
        
def my_callback(result):
    print(result)

pool = mp.Pool(5)

sessions = []
sessions.append({"server": "foo.tld", "exit_code": 1})
sessions.append({"server": "bar.tld", "exit_code": 0})

for i, session in enumerate(sessions):
    # Below, "session" argument is a dict
    pool.apply_async(my_func, kwds={ "session": session, "index": i}, callback=my_callback)
        
pool.close()
pool.join() 

Upvotes: 0

Views: 739

Answers (3)

Booboo
Booboo

Reputation: 44108

All you said was, "my script get stuck", which is not very descriptive. You also did not tag your question with the platform on which you are running, such as linux or windows, as you are supposed to do when you post questions tagged with multiprocessing. That leaves me guessing that your problem arises from running on Windows. If so, the issue is that Windows uses a method called spawn to create new processes. This means to create the processes in the multiprocessing pool and invoke your worker function, my_func, a new empty address space is created and a new Python interpreter is launched that initializes the process by re-reading your source program executing every statement at global scope.

So all import statements, function definitions, data declarations, executable statements, etc. at global scope will be executed. The only difference for the newly created process is that whereas in the main process internal variable __name__ has the value '__main__', it will not have this value for these subprocesses. This allows you to place within blocks that test the value of __name__ any statements that you do not want executed by the initialization of the subprocesses. Such statements would be the actual executable statements that create the subprocesses. If you do not place these statements within a if __name__ == '__main__': block, then you would get into a recursive loop creating new processes ad infinitum (actually Python recognizes this condition and throws an exception instead).

So typically you place code that creates new processes within a function such as main (choose any name you want) and make sure that main is only called conditionally based on the value of __name__:

if __name__ == '__main__':
    main()

Or you can keep the process-creation code at global scope but within an if __name__ == '__main__': block:

import multiprocessing as mp

def my_func(session, index):
    result = { "server": session['server'], "exit_code": session['exit_code'],"index": index }
    return result

def my_callback(result):
    print(result)

if __name__ == '__main__':
    pool = mp.Pool(5)

    sessions = []
    sessions.append({"server": "foo.tld", "exit_code": 1})
    sessions.append({"server": "bar.tld", "exit_code": 0})

    for i, session in enumerate(sessions):
        # Below, "session" argument is a dict
        pool.apply_async(my_func, kwds={ "session": session, "index": i}, callback=my_callback)

    pool.close()
    pool.join()

Prints:

{'server': 'foo.tld', 'exit_code': 1, 'index': 0}
{'server': 'bar.tld', 'exit_code': 0, 'index': 1}

Note that I also placed all executable statements, such as the creation of the sessions list, which are not required to be executed by the subprocesses within the if __name__ == '__main__': for efficiency.

It is just "neater", however, to code as follows:

import multiprocessing as mp

def my_func(session, index):
    result = { "server": session['server'], "exit_code": session['exit_code'],"index": index }
    return result

def my_callback(result):
    print(result)

def main():
    pool = mp.Pool(5)

    sessions = []
    sessions.append({"server": "foo.tld", "exit_code": 1})
    sessions.append({"server": "bar.tld", "exit_code": 0})

    for i, session in enumerate(sessions):
        # Below, "session" argument is a dict
        pool.apply_async(my_func, kwds={ "session": session, "index": i}, callback=my_callback)

    pool.close()
    pool.join()
    
if __name__ == '__main__':
    main()

Upvotes: 1

AKX
AKX

Reputation: 168903

This simplified (with, imap_unordered) version works just fine for me.

Always remember to wrap the main multiprocessing entrypoint in a function.

import multiprocessing as mp


def my_func(arg):
    index, session = arg
    result = {
        "server": session["server"],
        "exit_code": session["exit_code"],
        "index": index,
    }
    return result


def main():
    sessions = [
        {"server": "foo.tld", "exit_code": 1},
        {"server": "bar.tld", "exit_code": 0},
    ]
    with mp.Pool(5) as pool:

        for res in pool.imap_unordered(my_func, enumerate(sessions)):
            print(res)


if __name__ == "__main__":
    main()

Upvotes: 0

Adam.Er8
Adam.Er8

Reputation: 13393

You need to keep the AsyncResult object returned from apply_async, and call its get method, like this:

for i, session in enumerate(sessions):
    res = pool.apply_async(my_func,  kwds={ "session": session, "index": i}, callback=my_callback)
    print(res.get())

Output should be something like:

{'server': 'foo.tld', 'exit_code': 1, 'index': 0}
{'server': 'foo.tld', 'exit_code': 1, 'index': 0}
{'server': 'bar.tld', 'exit_code': 0, 'index': 1}
{'server': 'bar.tld', 'exit_code': 0, 'index': 1}

check out the example in the docs

Upvotes: 1

Related Questions