Reputation: 21
Abstract
Code
to test the idea i wrote the following, using python 2.7.10 in windows:
from multiprocessing.dummy import Pool as thPool
from time import sleep
def funct(n):
#print "{"+str(n)+"}",
#print "|"+str(len(li))+"|",
if n % 2 == 0:
return n
elif n % 3 == 0:
li.append(12)
elif n % 5 == 0:
li.append(14)
#sleep(.25)
if __name__ == "__main__":
P = thPool(4)
li = [1,2,3,4,5,6,7,8,9]
lf=(i for i in P.imap_unordered(funct,li) if i is not None)
#print lf
for m in lf:
print "("+str(m)+")",
#sleep(.25)
what i expected was to get, in no particular order, the even numbers (2,4,6,8), two 12s, and one 14.
Outcome
i ran the above multiple times. i got different results each time:
i assumed that the iterator finishes before the list is appended. i put a debug print statement inside funct to show the size of the global list li and i got:
i also played around with creating delays, in case of a race condition, but that didn't seem to make a difference in predictability.
Questions
Upvotes: 1
Views: 869
Reputation: 155574
Don't use multiprocessing.Pool.imap_unordered
in this case. There are ways to make it work, but they're ugly and fragile. Use a producer-consumer pattern, where the consumers occasionally act as producers.
from multiprocessing.dummy import Process, Queue
def process(inq, outq):
while True:
n = inq.get()
try:
if n % 2 == 0:
outq.put(n) # Queue up for printing
elif n % 3 == 0:
inq.put(12) # Queue up for future processing
elif n % 5 == 0:
inq.put(14) # Queue up for future processing
finally:
inq.task_done()
def printer(q):
while True:
m = q.get()
try:
print "("+str(m)+")",
finally:
q.task_done()
def main():
workqueue = Queue()
printqueue = Queue()
printworker = Process(target=printer, args=(printqueue,))
printworker.daemon = True
printworker.start()
for i in range(4):
processor = Process(target=process, args=(workqueue, printqueue))
processor.daemon = True
processor.start()
li = [1,2,3,4,5,6,7,8,9]
for x in li:
workqueue.put(x) # optionally, put all items before starting processor threads so initial work is processed strictly before generated work
workqueue.join() # Wait for all work, including new work, to be processed
printqueue.join() # Then wait for all the results to be printed
if __name__ == '__main__':
main()
Upvotes: 2
Reputation: 1929
EDIT: I re-read the problem more clearly and understand that you want to modify the list on the fly. This need to be done with a proper shared object (such as Array, Queue or even a Manager from the multiprocessing library) that is threadsafe.
Also, I unfortunately don't think you can use imap_unordered()
in that case. I think the behavior you are seeing is due to the fact that imap_unordered
is sometimes getting to the end of the iterable and stops handing out work before the additional items are being placed on the list.
Upvotes: 1