ice3
ice3

Reputation: 335

ZMQ latency with PUB-SUB (slow subscriber)

I have found a lot of question on a similar topic but they didn't help me to solve my problem.

Using :

TL;DR

Receiver queue in ZMQ SUB socket is growing indefinitely even after HWM are set. This happen when subscriber is slower than publisher. What can I do to prevent it ?

Background

I work in the human computer interaction filed. We have a huge code base to control the mouse cursor, this kind of things. I wanted to "break it" in several module, communicating with ZMQ. It must have as little latency as possible, but dropping (losing) messages is not that important.

An other interesting aspect is the possibility to add "spies" between the nodes. Thus the PUB/SUB sockets seems to be the most adequate.

Something like this :

+----------+                +-----------+                 +------------+
|          | PUB            |           |  PUB            |            |
|  Input   | +----+------>  |  Filter   |  +----+------>  |   Output   |
|          |      |     SUB |           |       |     SUB |            |
+----------+      v         +-----------+       v         +------------+
               +-----+                       +-----+                   
               |Spy 1|                       |Spy 2|                   
               +-----+                       +-----+       

Problem

Everything works fine, except when we add the spies. If we add a spy doing "heavy stuff" like real time visualisations with matplotlib we notice an increasing latency in the plots. IE : on the graph above, filter and output are fast, no latency is seen, but on Spy 2, latency can reach 10 min after running 20 min (!!)

It looks like the queue on the receiver grows indefinitely. We investigated the High Water Mark (HWM) functionalities of ZMQ to set it low to drop older messages, but nothing changed.

Minimal code

Architecture :

+------------+                +-------------+
|            |  PUB           |             |
|   sender   | -------------> |  receiver   |
|            |             SUB|             |
+------------+                +-------------+

The receiver is a slow receiver (acting as a spy in the first graph)

Code :

Sender.py

import time
import zmq

ctx = zmq.Context()

sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.SNDBUF, 256)
sender.set_hwm(10)
sender.bind('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(sender.get_hwm()) ## 10

i = 0
while True:
    mess = "{} {}".format(i, time.time())
    sender.send_string(mess)
    print("Send : {}".format(mess))
    i+= 1

receiver.py:

import time
import zmq

ctx = zmq.Context()
front_end = ctx.socket(zmq.SUB)

front_end.set_hwm(1)
front_end.setsockopt(zmq.RCVBUF, 8)

front_end.setsockopt_string(zmq.SUBSCRIBE, '')
front_end.connect('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(front_end.get_hwm()) ## 1

while True:
    mess = front_end.recv_string()
    i, t = mess.split(" ")
    mess = "{} {}".format(i, time.time() - float(t))
    print("received : {}".format(mess))
    time.sleep(1)  # slow

I don't think that this is a normal behaviour for ZMQ Pub/Sub. I tried to set the HWM in the receiver, in the subscriber, in both, but nothing changed.

What am I missing ?

Edit :

I don't think I was clear when I explained my problem. I made an implementation moving the mouse cursor. The input was the mouse cursor position send in ZMQ at 200Hz (with a .sleep( 1.0 / 200 ) ), some processing was done and the mouse cursor position was updated (I don't have this sleep in my minimal example).

Everything was smooth, even when I launched the spies. The spies nevertheless had a growing latency (because of the slow processing). The latency doesn't appear in the cursor, at the end of the "pipeline".

I think the problem comes from the slow subscriber queuing the messages.

In my example, if we kill the sender and let the receiver alive, messages will continue to be displayed until all (?) the submitted messages are displayed.

The spy is plotting the position of the cursor to provide some feedback, it is still very inconvenient to have such a lag... I just want to get the last message sent, this is why I tried to lower the HWM.

Upvotes: 4

Views: 8057

Answers (2)

user3666197
user3666197

Reputation: 1

A better real-time design / validation is missing

ZeroMQ is a powerful messaging layer.

That said, check how many messages it really sends per second in the original while True: killer-loop

Measure it. Design on facts, not on feelings.

Facts matter.

start_CLK = time.time()                                    # .SET _CLK
time.sleep( 0.001)                                         # .NOP avoid DIV/0!
i = 0                                                      # .SET CTR
while True:                                                # .LOOP
    sender.send_string( "{} {}".format( i, time.time() ) ) # .SND ZMQ-PUB 
    print i / ( time.time() - start_CLK )                  # .GUI perf [msg/sec]
    i+= 1                                                  # .INC CTR

ZeroMQ does its best to populate that avalanche down the scheme.

And it is pretty good at this.

Your [Filter] + [Spy1] + [Output] + [Spy2] pipeline processing, end-to-end, has either

  • be faster, incl. both .send() + .recv_string() overheads than the [Input]-sender

or

  • be the principal blocking sick-slick element, causing the internal PUB/SUB queueing to grow, grow, grow

This chain-of-queues problem can be solved by another architecture design.

Things to re-think:

  1. sub-sample the [Filter].send() cadency ( interleave factor is dependent on stability issues of the real-time process under your control -- be it 1 msec ( btw an O/S timer resolution, so no quantum-physics experiments are possible with COTS O/S timer controls :o) ), 10 msec for bidirectional voice-streaming, 50 msec for TV/GUI streaming, 300 msec for keyboard event-stream et al )

  2. online v/s offline post-processing / visualisation ( you noticed a heavy matplotlib processing, there you typically bear about 800 - 1600 - 3600 msec overheads, even on simple 2D graphing -- measure it before deciding about a change in PUB/SUB-<proc1>-PUB/SUB-<proc2> processing architecture ( you already noticed, that <spy2> cause problems in growing <proc2>-PUB-feeding & sending overheads ).

  3. number of threads vs. number of localhost cores, that execute them -- as seen from the localhost ip, all the processes reside on the same localhost. Plus add +one thread per ZMQ.Context used, plus review Python GIL locking overhead, if all threads were instantiated from the same Python interpreter... Blocking grows. Blocking hurts. A better distributed architecture can improve these performance aspects. However, review [1] and [2] first

n.b. calling a 20 minutes processing pipeline delay ( a real-time system TimeDOMAIN skew ) a latency is a lot euphemistic

Upvotes: 3

ashic
ashic

Reputation: 6495

From http://zguide.zeromq.org/page:all#toc50:

When your socket reaches its HWM, it will either block or drop data depending on the socket type. PUB and ROUTER sockets will drop data if they reach their HWM, while other socket types will block. Over the inproc transport, the sender and receiver share the same buffers, so the real HWM is the sum of the HWM set by both sides.

So SUB sockets don't really drop older messages. You can do some trickery with a router to implement a dropping subscriber, or think about a design that can cater for elements being slow. On of the good things with Zero is that a lot of your core code can remain the same, and you're likely to move around the wrappers that deal with the sockets.

Upvotes: 3

Related Questions