Reputation: 15010
I want to establish publish subscribe communication between to machines.
The two machines, that I have, are ryu-primary
and ryu-secondary
The steps I follow in each of the machines are as follows.
In the initializer for ryu-primary
(IP address is 192.168.241.131)
self.context = zmq.Context()
self.sub_socket = self.context.socket(zmq.SUB)
self.pub_socket = self.context.socket(zmq.PUB)
self.pub_port = 5566
self.sub_port = 5566
def establish_zmq_connection(self): # Socket to talk to server
print( "Connection to ryu-secondary..." )
self.sub_socket.connect( "tcp://192.168.241.132:%s" % self.sub_port )
def listen_zmq_connection(self):
print( 'Listen to zmq connection' )
self.pub_socket.bind( "tcp://*:%s" % self.pub_port )
def recieve_messages(self):
while True:
try:
string = self.sub_socket.recv( flags=zmq.NOBLOCK )
print( 'flow mod messages recieved {}'.format(string) )
return string
except zmq.ZMQError:
break
def push_messages(self,msg):
self.pub_socket.send( "%s" % (msg) )
From ryu-secondary (IP address - 192.168.241.132)
In the initializer
self.context = zmq.Context()
self.sub_socket = self.context.socket(zmq.SUB)
self.pub_socket = self.context.socket(zmq.PUB)
self.pub_port = 5566
self.sub_port = 5566
def establish_zmq_connection(self): # Socket to talk to server
print( "Connection to ryu-secondary..." )
self.sub_socket.connect( "tcp://192.168.241.131:%s" % self.sub_port )
def listen_zmq_connection(self):
print( 'Listen to zmq connection' )
self.pub_socket.bind( "tcp://*:%s" % self.pub_port )
def recieve_messages(self):
while True:
try:
string = self.sub_socket.recv( flags=zmq.NOBLOCK )
print( 'flow mod messages recieved {}'.format(string) )
return string
except zmq.ZMQError:
break
def push_messages(self,msg):
print( 'pushing message to publish socket' )
self.pub_socket.send( "%s" % (msg) )
These are the functions that I have.
I am calling on ryu-secondary
:
establish_zmq_connections()
push_messages()
But I am not recieving those messages on ryu-primary
, when I call
listen_zmq_connection()
recieve_messages()
Can someone point out to me what I am doing wrong?
Upvotes: 0
Views: 1336
Reputation: 1
PUB/SUB
messaging pattern setupThere are several important steps in making the PUB/SUB
pattern work.
All this is well described in the ZeroMQ documentation.
You need not repeat both pub & sub parts of code on both sides, the more that it masks, as A side-effect thereof, the case if you mix the pub and sub socket addresses/ports/calls/etc in an "opposite" node code and you do not see such a principal collision.
your code defines the initial form of PUB
-archetype, that is expected to .push_messages()
your code defines the initial form of SUB
-archetype, that is expected to .receive_messages()
your code does not show, how do you control who goes first on a connection setup -- whether .bind()
or .connect()
appears at random or before/after the other
your code does not show any subscription setup, after the SUB
-archetype was instantiated. A default value upon a socket instantiation does need to be modified via a .setsockopt( zmq.SUBSCRIBE = '')
method, otherwise there is a prohibitive filter that does not allow any ( yet unsubscribed ) message to pass through and got-output ( "received" ) on the SUB
-side
You may have noticed from the ZeroMQ documentation, that until setup otherwise, the sub-side does filter-out all incoming messages.
"The ZMQ_SUBSCRIBE option shall establish a new message filter on a ZMQ_SUB socket. Newly created ZMQ_SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.
An empty option_value
of length zero shall subscribe to all incoming messages. A non-empty option_value
shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter."
There is another possibility for a python code using pyzmq 13.0+
. There you may also setup this via a Context class-method .setsockopt( zmq.SUBSCRIBE, "" )
et al, but such call has to precede the new socket instantiation from a Context-instance pre-configured this way.
Upvotes: 2