Reputation: 437
I come here because I have an issue with my Jupiter's Python3 notebook. I need to create a function that uses the multiprocessing library. Before to implement it, I make some tests. I found a looooot of different examples but the issue is everytime the same : my code is executed but nothing happens in the notebook's interface :
The code i try to run on jupyter is this one :
import os
from multiprocessing import Process, current_process
def doubler(number):
"""
A doubling function that can be used by a process
"""
result = number * 2
proc_name = current_process().name
print('{0} doubled to {1} by: {2}'.format(
number, result, proc_name))
return result
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
procs = []
proc = Process(target=doubler, args=(5,))
for index, number in enumerate(numbers):
proc = Process(target=doubler, args=(number,))
proc2 = Process(target=doubler, args=(number,))
procs.append(proc)
procs.append(proc2)
proc.start()
proc2.start()
proc = Process(target=doubler, name='Test', args=(2,))
proc.start()
procs.append(proc)
for proc in procs:
proc.join()
It's OK when I just run my code without Jupyter but with the command "python my_progrem.py" and I can see the logs :
Is there, for my example, and in Jupyter, a way to catch the results of my two tasks (proc1 and proc2 which both call thefunction "doubler") in a variable/object that I could use after ? If "yes", how can I do it?
Upvotes: 34
Views: 85574
Reputation: 301
It would be good to clarify some things before to give the answer:
multiprocessing.Pool
does not work on interactive interpreter (such as Jupyter notebooks). See also this answer.multiprocessing.Pool
, multiprocessing.ThreadPool
does work also in Jupyter notebooksTo make a generic Pool
class working on both classic and interactive python interpreters I have made this:
def is_notebook() -> bool:
try:
if "get_ipython" in globals().keys():
get_ipython = globals()["get_ipython"]
shell = get_ipython().__class__.__name__
if shell == "ZMQInteractiveShell":
return True # Jupyter notebook or qtconsole
# elif shell == "TerminalInteractiveShell":
# return False # Terminal running IPython
# else:
return False # Other type (?)
except NameError:
return False # Probably standard Python interpreter
if is_notebook():
from multiprocessing.pool import ThreadPool as Pool
from threading import Lock
else:
from multiprocessing.pool import Pool
from multiprocessing import Lock
The following example works on both standard .py and jupyter .ipynb files.
#########################################
# Diversified import based on execution environment (notebook/standard interpreter)
#########################################
def is_notebook() -> bool:
try:
if "get_ipython" in globals().keys():
get_ipython = globals()["get_ipython"]
shell = get_ipython().__class__.__name__
if shell == "ZMQInteractiveShell":
return True # Jupyter notebook or qtconsole
# elif shell == "TerminalInteractiveShell":
# return False # Terminal running IPython
# else:
return False # Other type (?)
except NameError:
return False # Probably standard Python interpreter
if is_notebook():
from multiprocessing.pool import ThreadPool as Pool
from threading import Lock
else:
from multiprocessing.pool import Pool
from multiprocessing import Lock
#########################################
# Minimal program example
#########################################
import os
import random
from typing import Any, Iterator
def generate_values_for_parallel(max: int) -> Iterator[int]:
for _ in range(0, max):
yield random.random()
def parallel_unit(arg: Any) -> list[int]:
return "Received --> " + str(arg)
if __name__ == '__main__':
result = []
pool = Pool(processes=4)
for loop_result in pool.imap_unordered(parallel_unit, generate_values_for_parallel(10), 2*os.cpu_count()):
result.append(loop_result)
pool.close()
pool.join()
print("\n".join(result))
Upvotes: 6
Reputation: 579
@Konate's answer really helped me. Here is a simplified version using multiprocessing.pool:
import multiprocessing
def double(a):
return a * 2
def driver_func():
PROCESSES = 4
with multiprocessing.Pool(PROCESSES) as pool:
params = [(1, ), (2, ), (3, ), (4, )]
results = [pool.apply_async(double, p) for p in params]
for r in results:
print('\t', r.get())
driver_func()
Upvotes: 24
Reputation: 578
This works for me on MAC (cannot make it work on windows):
import multiprocessing as mp
mp_start_count = 0
if __name__ == '__main__':
if mp_start_count == 0:
mp.set_start_method('fork')
mp_start_count += 1
Upvotes: 0
Reputation: 437
I succeed by using multiprocessing.pool. I was inspired by this approach :
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
...etc For more, the code is at : https://docs.python.org/3.4/library/multiprocessing.html?
Upvotes: 8
Reputation: 5940
Another way of running multiprocessing jobs in a Jupyter notebook is to use one of the approaches supported by the nbmultitask package.
Upvotes: 9