displayNada
displayNada

Reputation: 21

python: multiprocessing.dummy using imap_unordered to iterate thru a growing/modifying list

Abstract

Code
to test the idea i wrote the following, using python 2.7.10 in windows:

from multiprocessing.dummy import Pool as thPool
from time import sleep

def funct(n):
    #print "{"+str(n)+"}",
    #print "|"+str(len(li))+"|",
    if n % 2 == 0:
        return n
    elif n % 3 == 0:
        li.append(12)
    elif n % 5 == 0:
        li.append(14)
    #sleep(.25)

if __name__ == "__main__":
    P = thPool(4)
    li = [1,2,3,4,5,6,7,8,9]

    lf=(i for i in P.imap_unordered(funct,li) if i is not None)
    #print lf
    for m in lf:
        print "("+str(m)+")",
        #sleep(.25)

what i expected was to get, in no particular order, the even numbers (2,4,6,8), two 12s, and one 14.

Outcome
i ran the above multiple times. i got different results each time:

i assumed that the iterator finishes before the list is appended. i put a debug print statement inside funct to show the size of the global list li and i got:

i also played around with creating delays, in case of a race condition, but that didn't seem to make a difference in predictability.

Questions

  1. why is the outcome unpredictable? how come the list additions are sometimes not processed? (even though the size of the list inside the function is >9, meaning the list has been appended)
  2. why is the output always ordered?
  3. is there a proper way to do this using the multiprocessing.dummy pool functionality?
  4. is there a suggested alternative method of doing this?

Upvotes: 1

Views: 869

Answers (2)

ShadowRanger
ShadowRanger

Reputation: 155574

Don't use multiprocessing.Pool.imap_unordered in this case. There are ways to make it work, but they're ugly and fragile. Use a producer-consumer pattern, where the consumers occasionally act as producers.

from multiprocessing.dummy import Process, Queue


def process(inq, outq):
    while True:
        n = inq.get()
        try:
            if n % 2 == 0:
                outq.put(n)  # Queue up for printing
            elif n % 3 == 0:
                inq.put(12)  # Queue up for future processing
            elif n % 5 == 0:
                inq.put(14)  # Queue up for future processing
        finally:
            inq.task_done()

def printer(q):
    while True:
        m = q.get()
        try:
            print "("+str(m)+")",
        finally:
            q.task_done()

def main():
    workqueue = Queue()
    printqueue = Queue()
    printworker = Process(target=printer, args=(printqueue,))
    printworker.daemon = True
    printworker.start()

    for i in range(4):
        processor = Process(target=process, args=(workqueue, printqueue))
        processor.daemon = True
        processor.start()

    li = [1,2,3,4,5,6,7,8,9]

    for x in li:
        workqueue.put(x)  # optionally, put all items before starting processor threads so initial work is processed strictly before generated work
    workqueue.join()  # Wait for all work, including new work, to be processed
    printqueue.join()  # Then wait for all the results to be printed

if __name__ == '__main__':
    main()

Upvotes: 2

RobertB
RobertB

Reputation: 1929

EDIT: I re-read the problem more clearly and understand that you want to modify the list on the fly. This need to be done with a proper shared object (such as Array, Queue or even a Manager from the multiprocessing library) that is threadsafe.

Also, I unfortunately don't think you can use imap_unordered() in that case. I think the behavior you are seeing is due to the fact that imap_unordered is sometimes getting to the end of the iterable and stops handing out work before the additional items are being placed on the list.

Upvotes: 1

Related Questions