yoonghm
yoonghm

Reputation: 4625

How do you make a worker process block while waiting for Value or Array from multiprocessing?

This document shows an example to share state between processes using Value and Array from multiprocessing library:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
    a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

It will print

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

My questions are

  1. How to you continue to pass information to the other process instead of during creation of the worker process?

  2. How could you make the worker process to block (or suspend) to wait for event from the parent process via this mechanism?

My platform is Windows 10. Shared memory could be shared among processes but fork() or spawn() processes could not inherit semaphore, lock, queue, etc.

Thanks.

[Update 1]

The demo given by @Manu-Valdés works. But I did an example does not work, perhaps you could help to spot the problem.

%%file ./examples/multiprocessing_pool5.py
# This code definitely will not work in Windows as queue object is not fork() along.
import multiprocessing
import os

def f1(q):
  x = q.get(True) # Block until something is in the queue
  if x == 55:
    raise Exception('I do not like 55!')
  elif x == 100:
    return
  else:
    print(f'f1({x}) -> {x*x}')


def f2(q):
  x = q.get(True) # Block until something is in the queue
  if x == 55:
    raise Exception('I do not like 55!')
  elif x == 100:
    return
  else:
    print(f'f2({x}) -> {x*x}')


def wp_init(q):
  #global queue
  #queue = q  # Point to the global queue in each process
  print(f'I am initialized')


def success_cb(result):
  print(f'Success returns = {result}')


def failure_cb(result):
  print(f'Failure returns = {result}')


if __name__ == '__main__':
  np = os.cpu_count()  # Number of cores per CPU
  queue = multiprocessing.Queue()
  pool = multiprocessing.Pool(np, initializer=wp_init, initargs=(queue,))

  for x in range(100):
    if x % 2 == 0:
      f = f1
    else:
      f = f2
  pool.apply_async(f, args=(queue,), callback=success_cb, error_callback=failure_cb)

  for x in range(100):
    queue.put(x)

  # Terminate them but I do not know how to loop through the processes
  for _ in range(100):
    queue.put(100)  # Terminate it

  pool.close()
  pool.join()

The error is

I am initialized
I am initialized
I am initialized
I am initialized
Failure returns = Queue objects should only be shared between processes through inheritance

Upvotes: 0

Views: 682

Answers (2)

yoonghm
yoonghm

Reputation: 4625

Let me answer my own question. Below are some of my understanding:

a) apply_async() returns immediately. I use multiprocessing.Manager() when creating Queue, Value and Array to avoid error Synchronized objects should only be shared between processes through inheritance or xxx objects should only be shared between processes through inheritance.

b) Use multiprocessing.Queue to signal, stop, terminate worker processes from its parent process.

c) It is not possible to pass different messages for different worker processes waiting on the same queue. Use different queues instead.

d) Pool.apply_async() only allows the main entry function for the worker process to accept one argument. In that case, put the arguments in a list ([]).

e) We could use multiprocessing.sharedctypes.RawValue(), multiprocessing.sharedctypes.RawArray(), multiprocessing.sharedctypes.Value() and Array multiprocessing.sharedctypes.Array() to create ctypes value, ctypes array, ctypes value with optional lock and ctypes array with optional locks in a shared memory. The shareable objects could be passed to worker processes via initializer and initargs keyword arguments when creating Pool object using multiprocessing.Pool(). These shareable object could not be passed using Pool.apply_async() or Pool.map() methods.

f) Standard Python documentation on Multiprocessing needs to be updated. For example,

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) should be written as class multiprocessing.pool.Pool([processes [, initializer=None [, initargs=None [, maxtaskperchild=None [, context=None]]]]])

import multiprocessing as mp
import time

# Worker process 1
def f1(q):
  while True:
    x = queue.get(True) # Block until there is message
    if x >= 20:
      raise Exception(f'f1: I do not like {x}!')
    elif x == -1:
      print(f'f1: Quit')
      return "f1"
    else:
      time.sleep(0.5)
      v = q[0]
      a = q[1]
      print(f'f1({x}, {v}, {a})')


# Worker process 2
def f2(q):
  while True:
    x = queue.get(True) # Block until there is message
    if x >= 20:
      raise Exception(f'f2: I do not like {x}!')
    elif x == -1:
      print(f'f2: Quit')
      return "f2"
    else:
      time.sleep(0.5)
      v = q[0]
      a = q[1]
      print(f'f1({x}, {v}, {a})')


def pInit(q, poolstr):
  '''
  Initialize global shared variables among processes.
  Could possibly share queue and lock here
  '''
  global queue
  queue = q  # Point to the global queue in each process
  print(f'{poolstr} is initialized')


def succCB(result):
  print(f'Success returns = {result}')


def failCB(result):
  print(f'Failure returns = {result}')


if __name__ == '__main__':

  # Create shared memory to pass data to worker processes
  # lock=True for multiple worker processes on the same queue
  v1 = mp.Manager().Value('i', 0, lock=True)
  a1 = mp.Manager().Array('i', range(20), lock=True)
  # lock=False for 1 worker process on the queue
  v2 = mp.Manager().Value('i', 0, lock=False)
  a2 = mp.Manager().Array('i', range(20), lock=False)

  # Create queues for signaling worker processes
  queue1 = mp.Manager().Queue()
  queue2 = mp.Manager().Queue()

  # Creating pool of processes now - fork here
  pool1 = mp.Pool(2, initializer=pInit, initargs=(queue1, "pool1"))
  pool2 = mp.Pool(1, initializer=pInit, initargs=(queue2, "pool2"))

  # Assign entry function for each pool
  pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
  pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
  pool2.apply_async(f2, args=[(v2, a2)], callback=succCB, error_callback=failCB)

  # Parent process, worker processes do not see this anymore
  # Parent process notifies the worker processes
  for x in range(20):
    a1[x] = x
    a2[x] = x+10
  v1.value = 2
  v2.value = 18
  queue1.put(1) 
  queue1.put(1) 
  queue2.put(18)

  # Parant processes terminate or quit the worker processes
  queue1.put(-1) # Quit properly
  queue1.put(20) # Raise exception
  queue2.put(-1) # Quit properly

  pool1.close()
  pool2.close()
  pool1.join()
  pool2.join()

The output is

pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
pool2 is initialized
f1(18, Value('i', 18), array('i', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]))
f2: Quit
pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
f1: Quit
Success returns = f1
Success returns = f2
Failure returns = f1: I do not like 20!

Upvotes: 0

Manu Valdés
Manu Valdés

Reputation: 2372

To communicate in a thread-safe manner you can use Queue. The get() method blocks if the queue is empty, and waits until a new element is put():

from multiprocessing import Process, Queue

def f(q):
    while True:
        element = q.get()
        print(element)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    q.put([42, None, 'hello'])
    p.join()

Upvotes: 1

Related Questions