liv2hak
liv2hak

Reputation: 15010

Why a simple publish subscribe is not working with zeromq?

I want to establish publish subscribe communication between to machines.

The two machines, that I have, are ryu-primary and ryu-secondary

The steps I follow in each of the machines are as follows.

In the initializer for ryu-primary (IP address is 192.168.241.131)

 self.context    = zmq.Context()
 self.sub_socket = self.context.socket(zmq.SUB)
 self.pub_socket = self.context.socket(zmq.PUB)
 self.pub_port   = 5566
 self.sub_port   = 5566


def establish_zmq_connection(self):                      # Socket to talk to server
    print( "Connection to ryu-secondary..." )
    self.sub_socket.connect( "tcp://192.168.241.132:%s" % self.sub_port )

def listen_zmq_connection(self):
    print( 'Listen to zmq connection' )
    self.pub_socket.bind( "tcp://*:%s" % self.pub_port )

def recieve_messages(self):
    while True:
        try:
            string = self.sub_socket.recv( flags=zmq.NOBLOCK )
            print( 'flow mod messages recieved {}'.format(string) )
            return string
        except zmq.ZMQError:
            break

def push_messages(self,msg):
    self.pub_socket.send( "%s" % (msg) )

From ryu-secondary (IP address - 192.168.241.132)

In the initializer

    self.context    = zmq.Context()
    self.sub_socket = self.context.socket(zmq.SUB)
    self.pub_socket = self.context.socket(zmq.PUB)
    self.pub_port   = 5566
    self.sub_port   = 5566


def establish_zmq_connection(self):                     # Socket to talk to server
     print( "Connection to ryu-secondary..." )
     self.sub_socket.connect( "tcp://192.168.241.131:%s" % self.sub_port )

def listen_zmq_connection(self):
     print( 'Listen to zmq connection' )
     self.pub_socket.bind( "tcp://*:%s" % self.pub_port )

def recieve_messages(self):
    while True:
        try:
            string = self.sub_socket.recv( flags=zmq.NOBLOCK )
            print( 'flow mod messages recieved {}'.format(string) )
            return string
        except zmq.ZMQError:
            break

def push_messages(self,msg):
    print( 'pushing message to publish socket' )
    self.pub_socket.send( "%s" % (msg) )

These are the functions that I have.

I am calling on ryu-secondary:

establish_zmq_connections()
push_messages() 

But I am not recieving those messages on ryu-primary, when I call

listen_zmq_connection()
recieve_messages() 

Can someone point out to me what I am doing wrong?

Upvotes: 0

Views: 1336

Answers (1)

user3666197
user3666197

Reputation: 1

Repair the PUB/SUB messaging pattern setup

There are several important steps in making the PUB/SUB pattern work.

All this is well described in the ZeroMQ documentation.

You need not repeat both pub & sub parts of code on both sides, the more that it masks, as A side-effect thereof, the case if you mix the pub and sub socket addresses/ports/calls/etc in an "opposite" node code and you do not see such a principal collision.

  1. your code defines the initial form of PUB-archetype, that is expected to .push_messages()

  2. your code defines the initial form of SUB-archetype, that is expected to .receive_messages()

  3. your code does not show, how do you control who goes first on a connection setup -- whether .bind() or .connect() appears at random or before/after the other

  4. your code does not show any subscription setup, after the SUB-archetype was instantiated. A default value upon a socket instantiation does need to be modified via a .setsockopt( zmq.SUBSCRIBE = '') method, otherwise there is a prohibitive filter that does not allow any ( yet unsubscribed ) message to pass through and got-output ( "received" ) on the SUB-side


Must modify a default SUB-side subscription filter, it is prohibitive

You may have noticed from the ZeroMQ documentation, that until setup otherwise, the sub-side does filter-out all incoming messages.

http://api.zeromq.org/2-1:zmq-setsockopt

"The ZMQ_SUBSCRIBE option shall establish a new message filter on a ZMQ_SUB socket. Newly created ZMQ_SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.

An empty option_value of length zero shall subscribe to all incoming messages. A non-empty option_value shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter."


Class-method pre-configuration of a Context instance possible

There is another possibility for a python code using pyzmq 13.0+. There you may also setup this via a Context class-method .setsockopt( zmq.SUBSCRIBE, "" ) et al, but such call has to precede the new socket instantiation from a Context-instance pre-configured this way.

Upvotes: 2

Related Questions