Reputation: 4625
This document shows an example to share state between processes using Value
and Array
from multiprocessing
library:
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
It will print
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
My questions are
How to you continue to pass information to the other process instead of during creation of the worker process?
How could you make the worker process to block (or suspend) to wait for event from the parent process via this mechanism?
My platform is Windows 10. Shared memory could be shared among processes but fork() or spawn() processes could not inherit semaphore, lock, queue, etc.
Thanks.
[Update 1]
The demo given by @Manu-Valdés works. But I did an example does not work, perhaps you could help to spot the problem.
%%file ./examples/multiprocessing_pool5.py
# This code definitely will not work in Windows as queue object is not fork() along.
import multiprocessing
import os
def f1(q):
x = q.get(True) # Block until something is in the queue
if x == 55:
raise Exception('I do not like 55!')
elif x == 100:
return
else:
print(f'f1({x}) -> {x*x}')
def f2(q):
x = q.get(True) # Block until something is in the queue
if x == 55:
raise Exception('I do not like 55!')
elif x == 100:
return
else:
print(f'f2({x}) -> {x*x}')
def wp_init(q):
#global queue
#queue = q # Point to the global queue in each process
print(f'I am initialized')
def success_cb(result):
print(f'Success returns = {result}')
def failure_cb(result):
print(f'Failure returns = {result}')
if __name__ == '__main__':
np = os.cpu_count() # Number of cores per CPU
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(np, initializer=wp_init, initargs=(queue,))
for x in range(100):
if x % 2 == 0:
f = f1
else:
f = f2
pool.apply_async(f, args=(queue,), callback=success_cb, error_callback=failure_cb)
for x in range(100):
queue.put(x)
# Terminate them but I do not know how to loop through the processes
for _ in range(100):
queue.put(100) # Terminate it
pool.close()
pool.join()
The error is
I am initialized
I am initialized
I am initialized
I am initialized
Failure returns = Queue objects should only be shared between processes through inheritance
Upvotes: 0
Views: 682
Reputation: 4625
Let me answer my own question. Below are some of my understanding:
a) apply_async()
returns immediately. I use multiprocessing.Manager()
when creating Queue
, Value
and Array
to avoid error Synchronized objects should only be shared between processes through inheritance
or xxx objects should only be shared between processes through inheritance
.
b) Use multiprocessing.Queue
to signal, stop, terminate worker processes from its parent process.
c) It is not possible to pass different messages for different worker processes waiting on the same queue. Use different queues instead.
d) Pool.apply_async()
only allows the main entry function for the worker process to accept one argument. In that case, put the arguments in a list ([]
).
e) We could use multiprocessing.sharedctypes.RawValue()
, multiprocessing.sharedctypes.RawArray()
, multiprocessing.sharedctypes.Value()
and Array
multiprocessing.sharedctypes.Array()
to create ctypes value, ctypes array, ctypes value with optional lock and ctypes array with optional locks in a shared memory. The shareable objects could be passed to worker processes via initializer
and initargs
keyword arguments when creating Pool
object using multiprocessing.Pool()
. These shareable object could not be passed using Pool.apply_async()
or Pool.map()
methods.
f) Standard Python documentation on Multiprocessing needs to be updated. For example,
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
should be written as class multiprocessing.pool.Pool([processes [, initializer=None [, initargs=None [, maxtaskperchild=None [, context=None]]]]])
import multiprocessing as mp
import time
# Worker process 1
def f1(q):
while True:
x = queue.get(True) # Block until there is message
if x >= 20:
raise Exception(f'f1: I do not like {x}!')
elif x == -1:
print(f'f1: Quit')
return "f1"
else:
time.sleep(0.5)
v = q[0]
a = q[1]
print(f'f1({x}, {v}, {a})')
# Worker process 2
def f2(q):
while True:
x = queue.get(True) # Block until there is message
if x >= 20:
raise Exception(f'f2: I do not like {x}!')
elif x == -1:
print(f'f2: Quit')
return "f2"
else:
time.sleep(0.5)
v = q[0]
a = q[1]
print(f'f1({x}, {v}, {a})')
def pInit(q, poolstr):
'''
Initialize global shared variables among processes.
Could possibly share queue and lock here
'''
global queue
queue = q # Point to the global queue in each process
print(f'{poolstr} is initialized')
def succCB(result):
print(f'Success returns = {result}')
def failCB(result):
print(f'Failure returns = {result}')
if __name__ == '__main__':
# Create shared memory to pass data to worker processes
# lock=True for multiple worker processes on the same queue
v1 = mp.Manager().Value('i', 0, lock=True)
a1 = mp.Manager().Array('i', range(20), lock=True)
# lock=False for 1 worker process on the queue
v2 = mp.Manager().Value('i', 0, lock=False)
a2 = mp.Manager().Array('i', range(20), lock=False)
# Create queues for signaling worker processes
queue1 = mp.Manager().Queue()
queue2 = mp.Manager().Queue()
# Creating pool of processes now - fork here
pool1 = mp.Pool(2, initializer=pInit, initargs=(queue1, "pool1"))
pool2 = mp.Pool(1, initializer=pInit, initargs=(queue2, "pool2"))
# Assign entry function for each pool
pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
pool2.apply_async(f2, args=[(v2, a2)], callback=succCB, error_callback=failCB)
# Parent process, worker processes do not see this anymore
# Parent process notifies the worker processes
for x in range(20):
a1[x] = x
a2[x] = x+10
v1.value = 2
v2.value = 18
queue1.put(1)
queue1.put(1)
queue2.put(18)
# Parant processes terminate or quit the worker processes
queue1.put(-1) # Quit properly
queue1.put(20) # Raise exception
queue2.put(-1) # Quit properly
pool1.close()
pool2.close()
pool1.join()
pool2.join()
The output is
pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
pool2 is initialized
f1(18, Value('i', 18), array('i', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]))
f2: Quit
pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
f1: Quit
Success returns = f1
Success returns = f2
Failure returns = f1: I do not like 20!
Upvotes: 0
Reputation: 2372
To communicate in a thread-safe manner you can use Queue
. The get()
method blocks if the queue is empty, and waits until a new element is put()
:
from multiprocessing import Process, Queue
def f(q):
while True:
element = q.get()
print(element)
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
q.put([42, None, 'hello'])
p.join()
Upvotes: 1