user1318499
user1318499

Reputation: 1354

How to prevent buffering/latency with PUB/SUB?

I'm sending video as a sequence of images (equals zmq messages) but sometimes, perhaps when the network is slow, they are received at a slower rate than they're sent and a growing latency appears, seemingly up to about a minute of video or 100s of images or megabytes of data. It usually clears itself eventually with the subscriber receiving messages at a faster rate than the publisher sends.

Instead, I want it to discard missed messages the same way it's supposed to if the subscriber is too slow recving them. I hoped zmq.CONFLATE=1 would do this but it doesn't. How then? I suspect they're being buffered at the publisher, which is not supposed to have any zmq buffer, or in the network stack somehow.

Simplified server code

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:12345")
camera = PiCamera()
stream = io.BytesIO()
for _ in camera.capture_continuous(stream, 'jpeg', use_video_port=True):
  stream.truncate()
  stream.seek(0)
  socket.send(stream.read())
  stream.seek(0)

Simplified client code

# Initialization
self.context = zmq.Context()
self.video_socket = self.context.socket(zmq.SUB)
self.video_socket.setsockopt(zmq.CONFLATE, 1)
self.video_socket.setsockopt(zmq.SUBSCRIBE, b"")
self.video_socket.connect("tcp://" + ip_address + ":12345")

def get_image(self):
  # Receive the latest image
  poll_result = self.video_socket.poll(timeout=0)
  if poll_result == zmq.POLLIN:
    return self.video_socket.recv()
  else:
    return None

The publisher is on a Raspberry Pi and the subscriber is on Windows.

Upvotes: 1

Views: 1967

Answers (2)

user1318499
user1318499

Reputation: 1354

Also set zmq.CONFLATE=1 on the server to keep only the latest message in the send queue.

Before binding the server socket

socket.setsockopt(zmq.CONFLATE, 1)

For some reason I mistakenly thought the PUB socket didn't have a send queue but it does.

Upvotes: 2

jamesdillonharvey
jamesdillonharvey

Reputation: 1042

I am not sure which version of python zmq you are using but based on the underlying c++ libzmq you need to:

  • Set the ZMQ_SNDHWM socket option on the server socket
  • Set the ZMQ_RCVHWM socket option on the client socket.

These options limit the number of messages to queue per completed connection in the case of pub/sub. If the queue grows larger than the HWM (high water mark) the messages will be discarded.

Also turn off conflate as that will interfere with these options.

Upvotes: 2

Related Questions