Reputation: 105
I have a pyzmq Publisher which sends around 1000 messages per second. I am trying to start around 10 Subscribers in an asyncio event_loop.
It works but around 2.5 times slower than speed of the only one Subscriber.
What could possibly be wrong with the code?
import asyncio
import zmq
import json
from zmq.backend.cython.constants import NOBLOCK
from zmq.asyncio import Context, Poller
from loop_ import Loop
class Client:
REQUEST_TIMEOUT = 35000
SERVER_ENDPOINT = "tcp://localhost:6666"
def __init__(self, id_):
self.id = id_
def get_task(self):
return asyncio.create_task(self.client_coroutine())
async def client_coroutine(self):
context = Context.instance()
socket = context.socket(zmq.SUB)
socket.connect(self.SERVER_ENDPOINT)
socket.setsockopt(zmq.SUBSCRIBE, b'4')
poller = Poller()
poller.register(socket, zmq.POLLIN)
while True:
event = dict(await poller.poll(self.REQUEST_TIMEOUT))
if event.get(socket) == zmq.POLLIN:
reply = await socket.recv_multipart(flags=NOBLOCK)
if not reply:
break
else:
print(eval(json.loads(reply[1].decode('utf-8'))))
else:
print("No response from server, retrying...")
socket.setsockopt(zmq.LINGER, 0)
socket.close()
poller.unregister(socket)
async def tasks():
_tasks = [Client(id_).get_task() for id_ in range(10)]
done, pending = await asyncio.wait(_tasks, return_when=asyncio.FIRST_EXCEPTION)
loop = asyncio.get_event_loop()
loop.run_until_complete(tasks())
Upvotes: 2
Views: 491
Reputation: 1
Q : What could possibly be wrong with the code?
Given the code is using the same localhost
( as seen from using the address ), the suspect number one is, that having 10x more work to process, the such workload will always stress the localhost
's O/S and the CPU, won't it?
Next comes the choice of the transport-class. Given all the SUB
-s are co-located on the same localhost
as the PUB
, there is all the L3-stack-based TCP/IP protocol work going wasted. To compare the relative costs ( the add-on effect of using the tcp://
transport-class for this hardware-singular messaging ), test the very same with using inproc://
transport-class, where none of the protocol-related TCP/IP-stack add-on processing will take place.
Last, but not least, my code will never mix different event-loops ( using ZeroMQ since v2.11, so someone may consider my a bit old-fashioned in avoiding relying on async
-decorated capabilities available in recent py3.6+ )
My code will use an explicit, non-blocking, zero-waiting test for a presence of a message per-aSocketINSTANCE
, as in aSocketINSTANCE.poll( zmq.POLLIN, 0 )
rather than using any "externally" added decoration, which may report the same, but via some additional (expensive and outside of my code domain of control) event-handling. All real-time, low-latency use-cases strive to bear as minimum latency/overheads as possible, so using explicit control will always win in my Projects, to any "modern" syntax-sugar sweetened tricks.
Anyway, enjoy the Zen-of-Zero
Upvotes: 3