doc
doc

Reputation: 127

Parallel processing of lists

I'm trying to get my head around multiprocessing. I have a list, I divide it in two equally long parts, I sort them in two separate processes. I know that this part works because printing saveto gives me the two lists. But I can't access them because in the end I get two empty lists. Why can't I access what I want to be written to l1 and l2 and how do I do that?

import multiprocessing
import random


def sort(l, saveto):
    saveto = sorted(l)
    print saveto

if __name__ == '__main__':

    l = [int(100000*random.random()) for i in xrange(10000)]

    listlen = len(l)
    halflist = listlen/2
    l1 = []
    l2 = []

    p1 = multiprocessing.Process(target=sort, args=(l[0:halflist], l1))
    p2 = multiprocessing.Process(target=sort, args=(l[halflist:listlen], l2))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print l1
    print l2

Upvotes: 1

Views: 359

Answers (1)

Sergey Gornostaev
Sergey Gornostaev

Reputation: 7777

Use multiprocessing.Queue to share data between processes

import multiprocessing
import random

def sort(l, queue):
    queue.put(sorted(l))


if __name__ == '__main__':
    l = [int(100000*random.random()) for i in xrange(10000)]

    listlen = len(l)
    halflist = listlen/2
    queue = multiprocessing.Queue()

    p1 = multiprocessing.Process(target=sort, args=(l[0:halflist], queue))
    p2 = multiprocessing.Process(target=sort, args=(l[halflist:listlen], queue))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print queue.get()
    print queue.get()

UPDATE:

As it turned out putting the large amounts of data to Queue can cause a deadlock. This is mentioned in the docs:

Warning

As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue.

Fixed version:

import multiprocessing
import random

def sort(l, queue):
    queue.put(sorted(l))


if __name__ == '__main__':
    l = [int(100000*random.random()) for i in range(10000)]

    listlen = len(l)
    halflist = listlen/2

    manager = multiprocessing.Manager()
    queue = manager.Queue()

    p1 = multiprocessing.Process(target=sort, args=(l[0:halflist], queue))
    p2 = multiprocessing.Process(target=sort, args=(l[halflist:listlen], queue))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print queue.get()
    print queue.get()

Upvotes: 1

Related Questions