dfarrell07
dfarrell07

Reputation: 3018

Get subscriber filter from a ZMQ PUB socket

I noticed in the FAQ, in the Monitoring section, that it's not possible to get a list of connected peers or to be notified when peers connect/disconnect.

Does this imply that it's also not possible to know which topics a PUB/XPUB socket knows it should publish, from its upstream feedback? Or is there some way to access that data?

I know that ZMQ >= 3.0 "supports PUB/SUB filtering at the publisher", but what I really want is to filter at my application code, using the knowledge ZMQ has about which topics are subscribed to.

My use-case is that I want to publish info about the status of a robot. Some topics involve major hardware actions, like switching the select lines on an ADC to read IR values.

I have a publisher thread running on the bot that should only do that "read" to get IR data when there are actually subscribers. However, since I can only feed a string into my pub_sock.send, I always have to do the costly operation, even if ZMQ is about to drop that message when there are no subscribers.

I have an implementation that uses a backchannel REQ/REP socket to send topic information, which my app can check in its publish loop, thereby only collecting data that needs to be collected. This seems very inelegant though, since ZMQ must already have the data I need, as evidenced by its filtering at the publisher.

I noticed that in this mailing list message, the OP seems to be able to see subscribe messages being sent to an XPUB socket.

However, there's no mention of how they did that, and I'm not seeing any such ability in the docs (still looking). Maybe they were just using Wireshark (to see upstream subscribe messages to an XPUB socket).

Upvotes: 14

Views: 3122

Answers (2)

Freek Wiekmeijer
Freek Wiekmeijer

Reputation: 4940

Using zmq.XPUB socket type, there is a way to detect new and leaving subscribers. The following code sample shows how:

# Publisher side
import zmq

ctx = zmq.Context.instance()
xpub_socket = ctx.socket(zmq.XPUB)
xpub_socket.bind("tcp://*:%d" % port_nr)
poller = zmq.Poller()
poller.register(xpub_socket)

events = dict(poller.poll(1000))
if xpub_socket in events:
    msg = xpub_socket.recv()
    if msg[0] == b'\x01':
        topic = msg[1:]
        print "Topic '%s': new subscriber" % topic
    elif msg[0] == b'\x00':
        topic = msg[1:]
        print "Topic '%s': subscriber left" % topic

Note that the zmq.XSUB socket type does not subscribe in the same manner as the "normal" zmq.SUB. Code sample:

# Subscriber side
import zmq
ctx = zmq.Context.instance()

# Subscribing of zmq.SUB socket
sub_socket = ctx.socket(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # OK
sub_socket.connect("tcp://localhost:%d" % port_nr)

# Subscribing zmq.XSUB socket
xsub_socket = ctx.socket(zmq.XSUB)
xsub_socket.connect("tcp://localhost:%d" % port_nr)
# xsub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # NOK, raises zmq.error.ZMQError: Invalid argument
xsub_socket.send_multipart([b'\x01', b'sometopic']) # OK, triggers the subscribe event on the publisher

I'd also like to point out the zmq.XPUB_VERBOSE socket option. If set, all subscription events are received on the socket. If not set, duplicate subscriptions are filtered. See also the following post: ZMQ: No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)

Upvotes: 8

frans
frans

Reputation: 9758

At least for the XPUB/XSUB socket case you can save a subscription state by forwarding and handling the packages manually:

context = zmq.Context()

xsub_socket = context.socket(zmq.XSUB)
xsub_socket.bind('tcp://*:10000')
xpub_socket = context.socket(zmq.XPUB)
xpub_socket.bind('tcp://*:10001')

poller = zmq.Poller()
poller.register(xpub_socket, zmq.POLLIN)
poller.register(xsub_socket, zmq.POLLIN)

while True:
    try:
        events = dict(poller.poll(1000))
    except KeyboardInterrupt:
        break

    if xpub_socket in events:
        message = xpub_socket.recv_multipart()

        # HERE goes some subscription handle code which inspects
        # message

        xsub_socket.send_multipart(message)
    if xsub_socket in events:
        message = xsub_socket.recv_multipart()
        xpub_socket.send_multipart(message)

(this is Python code but I guess C/C++ looks quite similar)

I'm currently working on this topic and I will add more information as soon as possible.

Upvotes: 1

Related Questions