Dan Mahowny
Dan Mahowny

Reputation: 163

PUB-SUB network with proxy: How to improve number of frames per second

Beginner in ZMQ. I implemented a PUB-SUB network with a proxy, with the idea to plug multiple websockets as PUBs into my proxy, into an aggregated feed (the SUB). I used the inproc protocol as it is all happening on the same process.

I wrote the below. I obtain 400 to 500 FPS. This is way too slow.

import time
import random
import threading
import zmq

# Channels from SENDERS (PUB) to PROXY
channels = ["inproc://first", "inproc://second"]

# Channel from PROXY to RECEIVER
outbound = "inproc://to_aggregate"

numrec = 0
ctx = zmq.Context.instance()

lock = threading.Lock()

def sender(context, address):
    socket = context.socket(zmq.PUB)
    socket.connect(address)
    while True:
        twait = random.randint(1,3)
        tosend = f'{twait} from {threading.current_thread()}'.encode()
        socket.send(tosend)

def receiver(context):
    global numrec
    socket = context.socket(zmq.SUB)
    socket.connect(outbound)
    topicfilter = ''  # As string, encoded to bytes later on
    socket.setsockopt(zmq.SUBSCRIBE, topicfilter.encode())
    while True:
        resp = socket.recv()
        with lock:
            # TOOK OUT A PRINT STATEMENT, WAS SLOWING DOWN THE LOOP
            numrec += 1

def middleman(context):
    data_in = context.socket(zmq.XSUB)
    [data_in.bind(channel) for channel in channels]
    data_out = context.socket(zmq.XPUB)
    data_out.bind(outbound)
    zmq.proxy(data_in, data_out)

exchpub = threading.Thread(target=sender, name='THE_TOPLEVEL_PUBLISHER', args=(ctx, channels[0]), daemon=True)
exchpub2 = threading.Thread(target=sender, name='THE_TOPLEVEL_PUBLISHER', args=(ctx, channels[1]), daemon=True)
exchsub = threading.Thread(target=receiver, name='THE_TOPLEVEL_SUB', args=(ctx,), daemon=True)
proxy = threading.Thread(target=middleman, name='THE_PROXY', args=(ctx,), daemon=True)

threadlist = [exchpub, exchpub2, exchsub, proxy]

[i.start() for i in threadlist]


secwait = 5

t=tzero=time.time()

while t-tzero < secwait: 
    t = time.time()

with lock:
    print(f'exited here and {numrec/secwait} FPS')

Here is my main question:

Why is it so slow?

Follow up questions:

1) ZMQ docs state: "INPROC: the server must issue a bind before any client issues a connect." yet regardless of the initialization order, there are no failures observed. Why?

2) Using send_multipart and recv_multipart slow down my code. (Sending an iterable with len == 2 slows it down by roughly half) Hence, why would I want to use this? Given not much of a speed difference, I would love to use it as follows:(SOURCE, PAYLOAD, TIMESTAMP).

3) How would you go about profiling the speed of one such code? Suggestions on the implementation?

Thanks.

Upvotes: 4

Views: 525

Answers (1)

user3097732
user3097732

Reputation: 159

You are running all the threads in a single process.

It is quite likely that this results in a slow performance because of the code. This is caused by Python's GIL (Global interpreter lock):

(...) the GIL allows only one thread to execute at a time, even in a multi-threaded architecture with more than one CPU core (...)

See: https://medium.com/python-features/pythons-gil-a-hurdle-to-multithreaded-program-d04ad9c1a63 (and many other sources).

For your program, this means that the sender and receiver loops spend a lot of time waiting for the GIL.

To circumvent this, use Python's multiprocess library, as explained here: https://timber.io/blog/multiprocessing-vs-multithreading-in-python-what-you-need-to-know/

Upvotes: 1

Related Questions