user911651
user911651

Reputation:

Retrieving subscriber count using zeromq PUB/SUB sockets

Is it possible to get the total count of subscribers from a PUB socket in zeromq?

Thanks!

Upvotes: 17

Views: 6245

Answers (4)

tapin13
tapin13

Reputation: 51

This is implementation on NodeJS for rep, I think for pub it's the same.

Like Jakob Möllås said, need to use monitor.

const zmq = require('zmq')
        , rep = zmq.socket('rep');

let counter = 0;

rep.bind('tcp://*:5560', function (err) {
    if (err) {
        console.log(err);
    } else {
        console.log("Listening on 5560…");
        rep.monitor(500, 0);
    }
});

// Register to monitoring events
rep.on('connect', function (fd, ep) {
    console.log('connect, endpoint:', ep);
});
rep.on('connect_delay', function (fd, ep) {
    console.log('connect_delay, endpoint:', ep);
});
rep.on('connect_retry', function (fd, ep) {
    console.log('connect_retry, endpoint:', ep);
});
rep.on('listen', function (fd, ep) {
    console.log('listen, endpoint:', ep);
});
rep.on('bind_error', function (fd, ep) {
    console.log('bind_error, endpoint:', ep);
});
rep.on('accept', function (fd, ep) {
    console.log('accept, endpoint:', ep);
    counter++;
});
rep.on('accept_error', function (fd, ep) {
    console.log('accept_error, endpoint:', ep);
});
rep.on('close', function (fd, ep) {
    console.log('close, endpoint:', ep);
});
rep.on('close_error', function (fd, ep) {
    console.log('close_error, endpoint:', ep);
});
rep.on('disconnect', function (fd, ep) {
    console.log('disconnect, endpoint:', ep);
    counter--;
});

// Handle monitor error
rep.on('monitor_error', function(err) {
    console.log('Error in monitoring: %s, will restart monitoring in 5 seconds', err);
    setTimeout(function() { rep.monitor(500, 0); }, 5000);
});

rep.on('message', function (msg) {
    console.log(`recieve: `, JSON.parse(msg));
    rep.send(JSON.stringify({ "status": FAIL, "code": 3666 }));
});

console

recieve:  { method: 'login', login: 'a', password: 'b1' }
accept, endpoint: tcp://0.0.0.0:5560
accept, endpoint: tcp://0.0.0.0:5560
login: a, password: b1
recieve:  { method: 'login', login: 'a', password: 'b1' }
disconnect, endpoint: tcp://0.0.0.0:5560
login: a, password: b1
disconnect, endpoint: tcp://0.0.0.0:5560

Upvotes: 3

Daniel
Daniel

Reputation: 1418

I encountered a (testing) scenario in which I had to wait for n subscribers before starting to publish messages. Here's the function that did the trick for me (in Python):

def wait_for_n_subscribers(pub_socket: zmq.Socket, n_subscribers: int):
    """
    blocks until pub_socket had n_subscribers connected to it
    """
    connections = 0
    events_socket = pub_socket.get_monitor_socket(events=zmq.EVENT_HANDSHAKE_SUCCEEDED)  # only accept this event
    while connections < n_subscribers:
        recv_monitor_message(events_socket)  # this will block until a handshake was successful
        connections += 1

Explanation:
After creating a PUB socket, we attach a PAIR socket to it, that will monitor the PUB socket for events.
When a SUB socket connects to the PUB socket it generates two events on the PUB (binding) side:
EVENT_ACCEPTED (32) followed by EVENT_HANDSHAKE_SUCCEEDED (4096).

Therefore we monitor for EVENT_HANDSHAKE_SUCCEEDED as the indicator for a successful subsciber connection. once the specified of subscribers is connected, the function returns.

Here's a complete toy-example:

import threading
import time
import zmq
from zmq.utils.monitor import recv_monitor_message  # requires libzmq >= 4.0

ep = "ipc:///tmp/test-socket"


def print_events_map():
    "auxilliary function to print all zmq socket events"
    print("Event names:")
    for name in dir(zmq):
        if name.startswith('EVENT_'):
            value = getattr(zmq, name)
            print("%21s : %4i" % (name, value))


context = zmq.Context()


def wait_for_n_subscribers(pub_socket: zmq.Socket, n_subscribers: int):
    """
    blocks until pub_socket had n_subscribers connected to it
    """
    connections = 0
    events_socket = pub_socket.get_monitor_socket(events=zmq.EVENT_HANDSHAKE_SUCCEEDED)  # only accept this event
    while connections < n_subscribers:
        recv_monitor_message(events_socket)  # this will block until a handshake was successful
        connections += 1


def simulate_sender(wait, n):
    s_pub = context.socket(zmq.PUB)
    s_pub.bind(ep)
    if wait:
        wait_for_n_subscribers(s_pub, n)
    for i in range(5):
        s_pub.send_pyobj(i)
        time.sleep(1)


subscribers = 2
s_sub_1 = context.socket(zmq.SUB)
s_sub_1.setsockopt(zmq.RCVTIMEO, 3000)  # wait at most 3 seconds
s_sub_1.subscribe("")

s_sub_2 = context.socket(zmq.SUB)
s_sub_2.subscribe("")

wait = True  # set to false if publisher should not wait
threading.Thread(target=simulate_sender, args=(wait, subscribers,)).start()
time.sleep(1)
print("connecting 1")
s_sub_1.connect(ep)
print("connecting 2")
s_sub_2.connect(ep)
while True:
    try:
        print("received %s" % s_sub_1.recv_pyobj())
    except zmq.error.Again:
        print("no incoming msgs for 3 seconds")
        break

Notes:

  1. Setting wait to False will cause the subscribers to miss the first published message(s), since the subscriber has a 1 second delay before connecting, and the publisher doesn't wait (for the subscriber to connect).
  2. The scenario assumes that the publisher binds and subsribers connect.
  3. Tested with zmq 4.1.4, pyzmq 20.0.0

Upvotes: 2

Shital Shah
Shital Shah

Reputation: 68738

There doesn't seem to be any direct way. Below is Python code to monitor socket events which can be used to maintain count:

import zmq
from zmq.eventloop import ioloop, zmqstream
import zmq.utils.monitor

class Publication:
    def start(self, port, host):
        context = zmq.Context()
        self._socket = context.socket(zmq.PUB)
        self._socket.bind("tcp://%s:%d" % (host, port))
        self._mon_socket = self._socket.get_monitor_socket(zmq.EVENT_CONNECTED | zmq.EVENT_DISCONNECTED)
        self._mon_stream = zmqstream.ZMQStream(self._mon_socket)
        self._mon_stream.on_recv(self._on_mon)

    def _on_mon(self, msg):
        ev = zmq.utils.monitor.parse_monitor_message(msg)
        event = ev['event']
        endpoint = ev['endpoint']
        if event == zmq.EVENT_CONNECTED:
            pass
            # print(endpoint)
        elif event == zmq.EVENT_DISCONNECTED:
            pass
            #print(endpoint)

One issue is that for some reason CONNECTED event is not firing. Another issue is that even when event fires, you only get endpoint ID which is like tcp://ip:port string. So for multiple clients on same node you get same endpoint ID.

Upvotes: 1

Jakob M&#246;ll&#229;s
Jakob M&#246;ll&#229;s

Reputation: 4369

Yes, but unfortunately not via any simple property or method.

You need to use the zmq_socket_monitor() function to connect an inproc service socket to the main socket you want to observe. From there you can listen to events regarding connect/disconnect and keep your own count of subscribers. It may not be a trivial task though, since it seems (to me at least) a bit hard to know when to consider a subscriber (or any remote connection) to be up/down (closed/disconnected/retry etc.). You will have to play around a bit.

The link includes samples and event descriptions.

Upvotes: 10

Related Questions