Reputation: 95
I am trying to get an acknowledgement from my subscriber back to my publisher using ZeroMQ in Python.
I tried a few code examples, using a zmq.PUSH
and zmq.PULL
code sequence, but to no avail.
My code as it stands:
PUB_SERVER.PY
import zmq
import random
import sys
import time
port = "5556"
if len( sys.argv ) > 1:
port = sys.argv[1]
int( port )
context = zmq.Context()
socket = context.socket( zmq.PUB )
socket.bind( "tcp://*:%s" % port )
while True:
# topic = random.randrange( 9999, 10005 )
topic = 10000
messagedata = random.randrange( 1, 215 ) - 80
print "%d %d" % ( topic, messagedata )
socket.send( "%d %d" % ( topic, messagedata ) )
time.sleep( 1 )
SUB_CLIENT.PY
import sys
import zmq
port = "5556"
if len( sys.argv ) > 1:
port = sys.argv[1]
int( port )
if len( sys.argv ) > 2:
port1 = sys.argv[2]
int( port1 )
# Socket to talk to server
context = zmq.Context()
socket = context.socket( zmq.SUB )
print "Collecting updates from weather server..."
socket.connect( "tcp://192.168.0.21:%s" % port )
if len( sys.argv ) > 2:
socket.connect( "tcp://192.168.0.21:%s" % port1 )
# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10000"
socket.setsockopt( zmq.SUBSCRIBE, topicfilter )
# Process 5 updates
total_value = 0
for update_nbr in range( 5 ):
string = socket.recv()
topic, messagedata = string.split()
total_value += int( messagedata )
print topic, messagedata
print "Average messagedata value for topic '%s' was %dF" % ( topicfilter, total_value / update_nbr )
That code gives me the output of the server in one SSH window (on a Parallella), and the received filtered messages of the client in another SSH window (on a RaspberryPi) which is working great.
Where I am lost is, once the client has gotten a filtered message from the server, how would it acknowledge that filtered message being received, and then have the server log those acknowledged messages?
Eventually, I'd want to do some intelligent decision making of sending a file to the subscriber who acknowledges.
Upvotes: 1
Views: 2057
Reputation: 1
May create a parallel messaging structure for a soft-signalling for that purpose.
Extend PUB_SERVER.PY
with a .SUB
Rx access point:
anAckRxSOCKET = context.socket( zmq.SUB ) # create SUB side
anAckRxSOCKET.bind( "tcp://*:%s" % aServerAckPORT ) ) # .bind()
anAckRxSOCKET.setsockopt( zmq.SUBSCRIBE, "" ) # SUB to *-anything
# ...
anAckRxSTRING = anAckRxSOCKET.recv() # .recv()
Extend SUB_CLIENT.PY
with a .PUB
Tx socket to the Server side access point:
anAckTxSOCKET = context.socket( zmq.PUB ) # create PUB side(s)
anAckTxSOCKET.connect( "tcp://192.168.0.21:%s" % aServerAckPORT ) )
and send ACK(s) with "a-proxy-ID" for any server-side processing you may want or need
anAckTxSOCKET.send( topicfilter ) # ACK with an "identity"-proxy
Upvotes: 2