Jared
Jared

Reputation: 607

how can I speed up simultaneous read and write of multiprocessing queues?

tl;dr - is there a way to increase the speed of simultaneously reading and writing to a multiprocessing queue?

I have an app that processes audit data. Think of it like a syslog relay. It receives data, parses it, then sends the event onward. The event rate could be significant - I'm shooting for 15,000 events per second (EPS).

in_queue = multiprocessing.Queue()

out_queue = multiprocessing.Queue()

I ran tests using Queues - I can place OR pull events into a Queue at 25,000 EPS. The slow-down occurs when multiple parsing processes (4) pull data off the queue as it is being written to. The rate dips down to sub-10,000 EPS. I'm guessing the underlying pipes, locks, etc. are the cause for the delay.

I read up on pipes and it looks like it only supports 2 endpoints. I need to fork off the CPU-intensive parsing to multiple procs. Can alternative methods like multiprocessing memory sharing achieve better results? How can I get better simultaneous .put() and .get() operations from a Queue?

Upvotes: 2

Views: 3141

Answers (1)

dano
dano

Reputation: 94881

Given your performance needs, I think you'd be better off using a third-party message broker like ZeroMQ or RabbitMQ for this. I found a benchmark comparing multihere (though it doesn't quite match your use-case). The difference in performance is enormous:

multiprocesing.Queue Results

1
2
3

python2 ./multiproc_with_queue.py
Duration: 164.182257891
Messages Per Second: 60907.9210414

0mq Results

1
2
3

python2 ./multiproc_with_zeromq.py
Duration: 23.3490710258
Messages Per Second: 428282.563744

I took both of those tests, and provided a more complicated workload, since one of the benefits of multiprocessing.Queue is that it handles serialization for you. Here's the new scripts:

mult_queue.py

import sys
import time
from  multiprocessing import Process, Queue

def worker(q):
    for task_nbr in range(1000000):
        message = q.get()
    sys.exit(1)

def main():
    send_q = Queue()
    Process(target=worker, args=(send_q,)).start()
    msg = {
            'something': "More",
            "another": "thing",
            "what?": range(200),
            "ok": ['asdf', 'asdf', 'asdf']
            }
    for num in range(1000000):
        send_q.put(msg)

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 1000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

multi_zmq.py

import sys
import zmq
from  multiprocessing import Process
import time
import json
import cPickle as pickle

def worker():
    context = zmq.Context()
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    for task_nbr in range(1000000):
        message = work_receiver.recv_pyobj()

    sys.exit(1)

def main():
    Process(target=worker, args=()).start()
    context = zmq.Context()
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")
    msg = {
            'something': "More",
            "another": "thing",
            "what?": range(200),
            "ok": ['asdf', 'asdf', 'asdf']
            }
    for num in range(1000000):
        ventilator_send.send_pyobj(msg)

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 1000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

Output:

dan@dan:~$ ./mult_zmq.py 
Duration: 14.0204648972
Messages Per Second: 71324.3110935
dan@dan:~$ ./mult_queue.py 
Duration: 27.2135331631
Messages Per Second: 36746.4229657

Upvotes: 3

Related Questions