Marcin Piotr SuRy
Marcin Piotr SuRy

Reputation: 173

Python Multiprocessing Queue Slow

I have a problem with python multiprocessing Queues. I'm doing some hard computation on some data. I have created few processes to lower calculation time, also data have been split evenly before sending it to processes. It decrease the time of calculations nicely but when I want to return data from the process by multiprocessing.Queue it takes ages and whole thing is slower than calculating in main thread.

    processes = []
    proc = 8
    for i in range(proc):
           processes.append(multiprocessing.Process(target=self.calculateTriangles, args=(inData[i],outData,timer)))
    for p in processes:
        p.start()
    results = []
    for i in range(proc):
        results.append(outData.get())
    print("killing threads")
    print(datetime.datetime.now() - timer)
    for p in processes:
        p.join()
    print("Finish Threads")
    print(datetime.datetime.now() - timer)

all of threads print their finish time when they are done. Here is example output of this code

0:00:00.017873 CalcDone    
0:00:01.692940 CalcDone
0:00:01.777674 CalcDone
0:00:01.780019 CalcDone
0:00:01.796739 CalcDone
0:00:01.831723 CalcDone
0:00:01.842356 CalcDone
0:00:01.868633 CalcDone
0:00:05.497160 killing threads
60968 calculated triangles 

As you can see everything is quiet simple until this code.

    for i in range(proc):
        results.append(outData.get())
    print("killing threads")
    print(datetime.datetime.now() - timer)

here are some observations I have made on mine computer and slower one. https://docs.google.com/spreadsheets/d/1_8LovX0eSgvNW63-xh8L9-uylAVlzY4VSPUQ1yP2F9A/edit?usp=sharing . On slower one there isn't any improvement as you can see.

Why does it take so much time to get items from queue when process is finished?? Is there way to speed this up?

Upvotes: 3

Views: 4928

Answers (2)

Marcin Piotr SuRy
Marcin Piotr SuRy

Reputation: 173

So I have solved it myself. Calculations are fast but copying objects from one process to another takes ages. I just made a method that cleared all not-necessary fields in the objects, also using pipes is faster than multiprocessing queues. It took down the time on my slower computer from 29 seconds to 15 seconds.

Upvotes: 3

Kris Urbanczyk
Kris Urbanczyk

Reputation: 11

This time is mainly spent on putting another object to the Queue and spiking up the Semaphore count. If you are able to bulk insert the Queue with all the data at once, then you cut down to 1/10 of the previous time.

I've assigned dynamically a new method to Queue based on the old one. Go to the multiprocessing module for your Python version:

/usr/lib/pythonx.x/multiprocessing.queues.py

Copy the "put" method of the class to your project e.g. for Python 3.7:

def put(self, obj, block=True, timeout=None):
    assert not self._closed, "Queue {0!r} has been closed".format(self)
    if not self._sem.acquire(block, timeout):
        raise Full

    with self._notempty:
        if self._thread is None:
            self._start_thread()
        self._buffer.append(obj)
        self._notempty.notify()

modify it:

def put_bla(self, obj, block=True, timeout=None):
    assert not self._closed, "Queue {0!r} has been closed".format(self)

    for el in obj:
        if not self._sem.acquire(block, timeout):  #spike the semaphore count
            raise Full
        with self._notempty:
            if self._thread is None:
                self._start_thread()
            self._buffer += el  # adding a collections.deque object
            self._notempty.notify()

The last step is to add the new method to the class. The multiprocessing.Queue is a DefaultContext method which returns a Queue object. It is easier to inject the method directly to the class of the created object. So:

from collections import deque

queue = Queue()
queue.__class__.put_bulk = put_bla  # injecting new method
items = (500, 400, 450, 350) * count  # (500, 400, 450, 350, 500, 400...)
queue.put_bulk(deque(items))

Unfortunately the multiprocessing.Pool was always faster by 10%, so just stick with that if you don't require everlasting workers to process your tasks. It is based on multiprocessing.SimpleQueue which is based on multiprocessing.Pipe and I have no idea why it is faster because my SimpleQueue solution wasn't and it is not bulk-injectable:) Break that and You'll have the fastest worker ever:)

Upvotes: 1

Related Questions