Reputation: 163
Beginner in ZMQ. I implemented a PUB-SUB network with a proxy, with the idea to plug multiple websockets as PUBs into my proxy, into an aggregated feed (the SUB). I used the inproc protocol as it is all happening on the same process.
I wrote the below. I obtain 400 to 500 FPS. This is way too slow.
import time
import random
import threading
import zmq
# Channels from SENDERS (PUB) to PROXY
channels = ["inproc://first", "inproc://second"]
# Channel from PROXY to RECEIVER
outbound = "inproc://to_aggregate"
numrec = 0
ctx = zmq.Context.instance()
lock = threading.Lock()
def sender(context, address):
socket = context.socket(zmq.PUB)
socket.connect(address)
while True:
twait = random.randint(1,3)
tosend = f'{twait} from {threading.current_thread()}'.encode()
socket.send(tosend)
def receiver(context):
global numrec
socket = context.socket(zmq.SUB)
socket.connect(outbound)
topicfilter = '' # As string, encoded to bytes later on
socket.setsockopt(zmq.SUBSCRIBE, topicfilter.encode())
while True:
resp = socket.recv()
with lock:
# TOOK OUT A PRINT STATEMENT, WAS SLOWING DOWN THE LOOP
numrec += 1
def middleman(context):
data_in = context.socket(zmq.XSUB)
[data_in.bind(channel) for channel in channels]
data_out = context.socket(zmq.XPUB)
data_out.bind(outbound)
zmq.proxy(data_in, data_out)
exchpub = threading.Thread(target=sender, name='THE_TOPLEVEL_PUBLISHER', args=(ctx, channels[0]), daemon=True)
exchpub2 = threading.Thread(target=sender, name='THE_TOPLEVEL_PUBLISHER', args=(ctx, channels[1]), daemon=True)
exchsub = threading.Thread(target=receiver, name='THE_TOPLEVEL_SUB', args=(ctx,), daemon=True)
proxy = threading.Thread(target=middleman, name='THE_PROXY', args=(ctx,), daemon=True)
threadlist = [exchpub, exchpub2, exchsub, proxy]
[i.start() for i in threadlist]
secwait = 5
t=tzero=time.time()
while t-tzero < secwait:
t = time.time()
with lock:
print(f'exited here and {numrec/secwait} FPS')
Here is my main question:
Why is it so slow?
Follow up questions:
1) ZMQ docs state: "INPROC: the server must issue a bind before any client issues a connect." yet regardless of the initialization order, there are no failures observed. Why?
2) Using send_multipart and recv_multipart slow down my code. (Sending an iterable with len == 2 slows it down by roughly half) Hence, why would I want to use this? Given not much of a speed difference, I would love to use it as follows:(SOURCE, PAYLOAD, TIMESTAMP).
3) How would you go about profiling the speed of one such code? Suggestions on the implementation?
Thanks.
Upvotes: 4
Views: 525
Reputation: 159
You are running all the threads in a single process.
It is quite likely that this results in a slow performance because of the code. This is caused by Python's GIL (Global interpreter lock):
(...) the GIL allows only one thread to execute at a time, even in a multi-threaded architecture with more than one CPU core (...)
See: https://medium.com/python-features/pythons-gil-a-hurdle-to-multithreaded-program-d04ad9c1a63 (and many other sources).
For your program, this means that the sender
and receiver
loops spend a lot of time waiting for the GIL.
To circumvent this, use Python's multiprocess library, as explained here: https://timber.io/blog/multiprocessing-vs-multithreading-in-python-what-you-need-to-know/
Upvotes: 1