Reputation: 43642
I have parameter sets in numpy arrays that I feed into a multiprocessing queue, but they are garbled when received in the worker. Here is my code to illustrate my issue and question.
import numpy as np
from multiprocessing import Process, Queue
NUMBER_OF_PROCESSES = 2
def worker(input, output):
for args in iter(input.get, 'STOP'):
print('Worker receives: ' + repr(args))
id, par = args
# simulate a complex task, and return result
result = par['A'] * par['B']
output.put((id, result))
# Define parameters to process
parameters = np.array([
(1.0, 2.0),
(3.0, 3.0)], dtype=[('A', 'd'), ('B', 'd')])
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for id, par in enumerate(parameters):
obj = ('id_' + str(id), par)
print('Submitting task: ' + repr(obj))
task_queue.put(obj)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get unordered results
results = {}
for i in range(len(parameters)):
id, result = done_queue.get()
results[id] = result
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
print('results: ' + str(results))
With numpy 1.4.1 and Python 2.6.6 on a 64-bit CentOS computer, my output is:
Submitting task: ('id_0', (1.0, 2.0))
Submitting task: ('id_1', (3.0, 3.0))
Worker receives: ('id_0', (2.07827093387802e-316, 6.9204740511333381e-310))
Worker receives: ('id_1', (0.0, 1.8834810076011668e-316))
results: {'id_0': 0.0, 'id_1': 0.0}
As it is shown, the tuple with the numpy record arrays are in good condition when submitting the tasks, but are garbled when the worker receives the arguments, and the results are incorrect. I read in the multiprocessing
documentation that the "arguments to the methods of proxies are picklable". From what I can tell, the numpy arrays are perfectly picklable:
>>> import pickle
>>> for par in parameters:
... print(pickle.loads(pickle.dumps(par)))
...
(1.0, 2.0)
(3.0, 3.0)
My question is why the parameters are not correctly received in the worker? How can I otherwise pass a row of a numpy record array to a worker?
Upvotes: 3
Views: 2230
Reputation: 47
I have met the same problem with you, but my condition is a little differnt with you.
Originally, I output a number every loop in my subprocess and combine them into numpy.dnarray. Finially, I pass the array to Queue, but My main process can't start after I run p.join().
Old codes it's like this below
# subprocess
for i in range(n):
array[i] = data[i]
queue.put(array)
# main process
queue.get()
However, I change another way to handle such problem such like this
# subprocess
for i in range(n):
queue.put((i, data[i]))
# main process
for i in range(n):
while queue.empty():
i, data = queue.get()
array[i] = data
Briefly, I just split my data into smaller part(data, position) and pass them to the queue, and the main process receive the data sync. I hope it would help
Upvotes: 0
Reputation: 25823
numpy arrays should be pickle-able (I think) but here you're actually dealing with numpy.void instances which, I'm not sure why, don't seem to be pickle-able.
If you do:
for par in parameters:
print(type(par))
print pickle.loads(pickle.dumps(par))
You get:
<type 'numpy.void'>
(-1.3918046672290164e-41, -1.3918046679677054e-41)
<type 'numpy.void'>
(-1.3918046672290164e-41, -1.3918046679677054e-41)
One way to work around this is to apply parameters = parameters.reshape([-1, 1])
to make your (N,) array into an (N, 1) array. That way when you loop over parameters, you'll get arrays of size 1 which will hopefully pickle just fine. Hope that helps.
Upvotes: 1