Reputation: 1085
I need to pass a dict to Python Pool.apply_async. It doesn't seem to work as expected because my script get stuck. How to pass a dict argument to Python Pool.apply_async method?
import multiprocessing as mp
def my_func(session, index):
result = { "server": session['server'], "exit_code": session['exit_code'],"index": index }
return result
def my_callback(result):
print(result)
pool = mp.Pool(5)
sessions = []
sessions.append({"server": "foo.tld", "exit_code": 1})
sessions.append({"server": "bar.tld", "exit_code": 0})
for i, session in enumerate(sessions):
# Below, "session" argument is a dict
pool.apply_async(my_func, kwds={ "session": session, "index": i}, callback=my_callback)
pool.close()
pool.join()
Upvotes: 0
Views: 739
Reputation: 44108
All you said was, "my script get stuck", which is not very descriptive. You also did not tag your question with the platform on which you are running, such as linux
or windows
, as you are supposed to do when you post questions tagged with multiprocessing
. That leaves me guessing that your problem arises from running on Windows. If so, the issue is that Windows uses a method called spawn
to create new processes. This means to create the processes in the multiprocessing pool and invoke your worker function, my_func
, a new empty address space is created and a new Python interpreter is launched that initializes the process by re-reading your source program executing every statement at global scope.
So all import statements, function definitions, data declarations, executable statements, etc. at global scope will be executed. The only difference for the newly created process is that whereas in the main process internal variable __name__
has the value '__main__'
, it will not have this value for these subprocesses. This allows you to place within blocks that test the value of __name__
any statements that you do not want executed by the initialization of the subprocesses. Such statements would be the actual executable statements that create the subprocesses. If you do not place these statements within a if __name__ == '__main__':
block, then you would get into a recursive loop creating new processes ad infinitum (actually Python recognizes this condition and throws an exception instead).
So typically you place code that creates new processes within a function such as main
(choose any name you want) and make sure that main
is only called conditionally based on the value of __name__
:
if __name__ == '__main__':
main()
Or you can keep the process-creation code at global scope but within an if __name__ == '__main__':
block:
import multiprocessing as mp
def my_func(session, index):
result = { "server": session['server'], "exit_code": session['exit_code'],"index": index }
return result
def my_callback(result):
print(result)
if __name__ == '__main__':
pool = mp.Pool(5)
sessions = []
sessions.append({"server": "foo.tld", "exit_code": 1})
sessions.append({"server": "bar.tld", "exit_code": 0})
for i, session in enumerate(sessions):
# Below, "session" argument is a dict
pool.apply_async(my_func, kwds={ "session": session, "index": i}, callback=my_callback)
pool.close()
pool.join()
Prints:
{'server': 'foo.tld', 'exit_code': 1, 'index': 0}
{'server': 'bar.tld', 'exit_code': 0, 'index': 1}
Note that I also placed all executable statements, such as the creation of the sessions
list, which are not required to be executed by the subprocesses within the if __name__ == '__main__':
for efficiency.
It is just "neater", however, to code as follows:
import multiprocessing as mp
def my_func(session, index):
result = { "server": session['server'], "exit_code": session['exit_code'],"index": index }
return result
def my_callback(result):
print(result)
def main():
pool = mp.Pool(5)
sessions = []
sessions.append({"server": "foo.tld", "exit_code": 1})
sessions.append({"server": "bar.tld", "exit_code": 0})
for i, session in enumerate(sessions):
# Below, "session" argument is a dict
pool.apply_async(my_func, kwds={ "session": session, "index": i}, callback=my_callback)
pool.close()
pool.join()
if __name__ == '__main__':
main()
Upvotes: 1
Reputation: 168903
This simplified (with
, imap_unordered
) version works just fine for me.
Always remember to wrap the main multiprocessing entrypoint in a function.
import multiprocessing as mp
def my_func(arg):
index, session = arg
result = {
"server": session["server"],
"exit_code": session["exit_code"],
"index": index,
}
return result
def main():
sessions = [
{"server": "foo.tld", "exit_code": 1},
{"server": "bar.tld", "exit_code": 0},
]
with mp.Pool(5) as pool:
for res in pool.imap_unordered(my_func, enumerate(sessions)):
print(res)
if __name__ == "__main__":
main()
Upvotes: 0
Reputation: 13393
You need to keep the AsyncResult
object returned from apply_async
, and call its get
method, like this:
for i, session in enumerate(sessions):
res = pool.apply_async(my_func, kwds={ "session": session, "index": i}, callback=my_callback)
print(res.get())
Output should be something like:
{'server': 'foo.tld', 'exit_code': 1, 'index': 0}
{'server': 'foo.tld', 'exit_code': 1, 'index': 0}
{'server': 'bar.tld', 'exit_code': 0, 'index': 1}
{'server': 'bar.tld', 'exit_code': 0, 'index': 1}
check out the example in the docs
Upvotes: 1