Reputation: 5239
Pretty simple multiprocessing example. Goals:
mp.Pool
line
)mp.Queue
mp.Queue
in the main program afterwardsSo lets do this:
import multiprocessing as mp
Init async processes with a mp.queue
def process_pool_init_per_process(q):
global mp_queue
mp_queue = q
Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))
This is getting called for every line
to be proccesed async
def process_async_main(line):
print(line)
q.put(line + '_asynced')
And now let´s start it using apply_async
line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()
And read from the queue
while not q.empty():
print(q.get()) # This should be the inital line
Fails wih:
python3 mp_process_example.py
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_async_main' on <module '__main__' from 'mp_process_example.py'>
The question is: Why is multiprocessing not finding the main class?
Complete code to reproduce:
import multiprocessing as mp
##### Init async processes
def process_pool_init_per_process(q):
global mp_queue
mp_queue = q
# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))
#This is getting called for every line to be proccesed async
def process_async_main(line):
print(line)
q.put(line + '_asynced')
line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()
while not q.empty():
print(q.get()) # This should be the inital line
Upvotes: 0
Views: 1016
Reputation: 5239
Ok... I´ve got it... For some strange reason multiprocessing
is not able to have the function to be asynced in the same file as the synchronized code.
Writing the code like this:
asynced.py
##### Init async processes
def process_pool_init_per_process(q):
global mp_queue
mp_queue = q
##### Function to be asycned
def process_async_main(line):
print(line)
mp_queue.put(line + '_asynced')
And than mp_process_example.py
:
import multiprocessing as mp
from asynced import process_async_main, process_pool_init_per_process
# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))
line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line,))
mp_resp = handler.get()
while not q.empty():
print(q.get()) # This should be the inital line + "_asynced"
Works as expected:
$ python3 mp_process_example.py
Hi, this is a test to test mp_queues with mp process pools
Hi, this is a test to test mp_queues with mp process pools_asynced
Upvotes: 1