gies0r
gies0r

Reputation: 5239

python multiprocessing process pool fails to find asynced function

Pretty simple multiprocessing example. Goals:

  1. Create a pool of process workers using mp.Pool
  2. Do some sort of transformation (here a simple string operation on line)
  3. Push the transformed line to mp.Queue
  4. Further process data from that mp.Queue in the main program afterwards

So lets do this:

import multiprocessing as mp

Init async processes with a mp.queue

def process_pool_init_per_process(q):
    global mp_queue
    mp_queue = q

Really init the mp_pool

no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))

This is getting called for every line to be proccesed async

def process_async_main(line):
    print(line)
    q.put(line + '_asynced')

And now let´s start it using apply_async

line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()

And read from the queue

while not q.empty():
    print(q.get()) # This should be the inital line

Fails wih:

python3 mp_process_example.py
Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_async_main' on <module '__main__' from 'mp_process_example.py'>

The question is: Why is multiprocessing not finding the main class?

Complete code to reproduce:

import multiprocessing as mp

##### Init async processes
def process_pool_init_per_process(q):
    global mp_queue
    mp_queue = q

# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))

#This is getting called for every line to be proccesed async
def process_async_main(line):
    print(line)
    q.put(line + '_asynced')

line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()

while not q.empty():
    print(q.get()) # This should be the inital line

Upvotes: 0

Views: 1016

Answers (1)

gies0r
gies0r

Reputation: 5239

Ok... I´ve got it... For some strange reason multiprocessing is not able to have the function to be asynced in the same file as the synchronized code.

Writing the code like this:

asynced.py

##### Init async processes
def process_pool_init_per_process(q):
    global mp_queue
    mp_queue = q

##### Function to be asycned
def process_async_main(line):
    print(line)
    mp_queue.put(line + '_asynced')

And than mp_process_example.py:

import multiprocessing as mp
from asynced import process_async_main, process_pool_init_per_process


# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))

line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line,))
mp_resp = handler.get()

while not q.empty():
    print(q.get()) # This should be the inital line + "_asynced"

Works as expected:

$ python3 mp_process_example.py
Hi, this is a test to test mp_queues with mp process pools
Hi, this is a test to test mp_queues with mp process pools_asynced

Upvotes: 1

Related Questions