Reputation: 3018
I noticed in the FAQ, in the Monitoring section, that it's not possible to get a list of connected peers or to be notified when peers connect/disconnect.
Does this imply that it's also not possible to know which topics a PUB/XPUB socket knows it should publish, from its upstream feedback? Or is there some way to access that data?
I know that ZMQ >= 3.0 "supports PUB/SUB filtering at the publisher", but what I really want is to filter at my application code, using the knowledge ZMQ has about which topics are subscribed to.
My use-case is that I want to publish info about the status of a robot. Some topics involve major hardware actions, like switching the select lines on an ADC to read IR values.
I have a publisher thread running on the bot that should only do that "read" to get IR data when there are actually subscribers. However, since I can only feed a string into my pub_sock.send, I always have to do the costly operation, even if ZMQ is about to drop that message when there are no subscribers.
I have an implementation that uses a backchannel REQ/REP socket to send topic information, which my app can check in its publish loop, thereby only collecting data that needs to be collected. This seems very inelegant though, since ZMQ must already have the data I need, as evidenced by its filtering at the publisher.
I noticed that in this mailing list message, the OP seems to be able to see subscribe messages being sent to an XPUB socket.
However, there's no mention of how they did that, and I'm not seeing any such ability in the docs (still looking). Maybe they were just using Wireshark (to see upstream subscribe messages to an XPUB socket).
Upvotes: 14
Views: 3122
Reputation: 4940
Using zmq.XPUB
socket type, there is a way to detect new and leaving subscribers. The following code sample shows how:
# Publisher side
import zmq
ctx = zmq.Context.instance()
xpub_socket = ctx.socket(zmq.XPUB)
xpub_socket.bind("tcp://*:%d" % port_nr)
poller = zmq.Poller()
poller.register(xpub_socket)
events = dict(poller.poll(1000))
if xpub_socket in events:
msg = xpub_socket.recv()
if msg[0] == b'\x01':
topic = msg[1:]
print "Topic '%s': new subscriber" % topic
elif msg[0] == b'\x00':
topic = msg[1:]
print "Topic '%s': subscriber left" % topic
Note that the zmq.XSUB
socket type does not subscribe in the same manner as the "normal" zmq.SUB
. Code sample:
# Subscriber side
import zmq
ctx = zmq.Context.instance()
# Subscribing of zmq.SUB socket
sub_socket = ctx.socket(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # OK
sub_socket.connect("tcp://localhost:%d" % port_nr)
# Subscribing zmq.XSUB socket
xsub_socket = ctx.socket(zmq.XSUB)
xsub_socket.connect("tcp://localhost:%d" % port_nr)
# xsub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # NOK, raises zmq.error.ZMQError: Invalid argument
xsub_socket.send_multipart([b'\x01', b'sometopic']) # OK, triggers the subscribe event on the publisher
I'd also like to point out the zmq.XPUB_VERBOSE
socket option. If set, all subscription events are received on the socket. If not set, duplicate subscriptions are filtered. See also the following post: ZMQ: No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)
Upvotes: 8
Reputation: 9758
At least for the XPUB/XSUB socket case you can save a subscription state by forwarding and handling the packages manually:
context = zmq.Context()
xsub_socket = context.socket(zmq.XSUB)
xsub_socket.bind('tcp://*:10000')
xpub_socket = context.socket(zmq.XPUB)
xpub_socket.bind('tcp://*:10001')
poller = zmq.Poller()
poller.register(xpub_socket, zmq.POLLIN)
poller.register(xsub_socket, zmq.POLLIN)
while True:
try:
events = dict(poller.poll(1000))
except KeyboardInterrupt:
break
if xpub_socket in events:
message = xpub_socket.recv_multipart()
# HERE goes some subscription handle code which inspects
# message
xsub_socket.send_multipart(message)
if xsub_socket in events:
message = xsub_socket.recv_multipart()
xpub_socket.send_multipart(message)
(this is Python code but I guess C/C++ looks quite similar)
I'm currently working on this topic and I will add more information as soon as possible.
Upvotes: 1