Reputation: 31
I'm trying to understand my code's behavior. I'm using zeromq to create a server that sends a "ping" and waits for "pong" responses. What i'm seeing is that when I send a ping, only one client receives it. when I run this code and send "ping" for the first time i receive:
pong: A
and when i run it again, i get
pong: B
why is that? I want to send one "ping" and receive two pongs.
here's the code:
from threading import Thread
import zmq
class zmqdealer(object):
def __init__(self, port):
context = zmq.Context()
self.sock = context.socket(zmq.DEALER)
#self.sock.setsockopt(zmq.RCVTIMEO, 1000)
self.sock.bind("tcp://*:%s" % port)
thread = Thread(target=lambda: self.poll())
thread.daemon = True
thread.start()
def poll(self):
while True:
reply = self.sock.recv()
if reply != "":
print(reply)
def ping(self):
self.sock.send_multipart(['', 'ping'])
class zmqrep(object):
def __init__(self, ident,host, port):
context = zmq.Context()
self.sock = context.socket(zmq.REP)
self.sock.connect("tcp://%s:%s" % (host, port))
self.ident = ident
thread = Thread(target=lambda: self.pong())
thread.daemon = True
thread.start()
def pong(self):
while True:
request = self.sock.recv()
if request == "ping":
msg = "pong: %s" % self.ident
self.sock.send(msg)
if __name__ == "__main__":
port = 11112
host = "localhost"
server = zmqdealer(port)
client1 = zmqrep('A',host,port)
client2 = zmqrep('B',host,port)
answer = raw_input('press <ENTER> to exit or type \'ping\' to get a pong\n')
while True:
if answer == "":
break
if answer == "ping":
server.ping()
answer = raw_input()
EDIT
I found a way to make this work. I really hope there is another way because i genuinely hate this one! so it looks like dealer sends to the clients in a round robin fashion. so to make my ping work i had to send it to all the clients. how? i subscribed to the monitor socket and added every connected client to a list. every time i ping, i ping to every client. look:
import threading
import zmq
from zmq.utils import monitor
def threadify(func, daemon=True):
thread = threading.Thread(target=func)
thread.daemon = daemon
thread.start()
class zmqdealer(object):
def __init__(self, port):
context = zmq.Context()
self.sock = context.socket(zmq.DEALER)
self.monitor_sock = self.sock.get_monitor_socket()
self.sock.bind("tcp://*:%s" % port)
self.connected_clients = {}
threadify(func=self.poll)
threadify(func=self.monitor)
def poll(self):
while True:
reply = self.sock.recv()
if reply != "":
print reply
def add_client(self, event):
endpoint = event['endpoint']
value = event['value']
if endpoint in self.connected_clients:
self.connected_clients[endpoint].append(value)
else:
self.connected_clients[endpoint] = [value]
def remove_client(self, event):
endpoint = event['endpoint']
value = event['value']
if endpoint in self.connected_clients \
and value in self.connected_clients[endpoint]:
self.connected_clients[endpoint].remove(value)
def monitor(self):
options = {zmq.EVENT_ACCEPTED: lambda e: self.add_client(e),
zmq.EVENT_DISCONNECTED: lambda e: self.remove_client(e)}
while True:
event = monitor.recv_monitor_message(self.monitor_sock)
event_type = event['event']
if event_type in options:
options[event_type](event)
event['event'] = event_types[event_type]
print event
def ping(self):
connected_clients_amount = sum([len(clients) for clients in self.connected_clients.values()])
for i in xrange(connected_clients_amount):
self.sock.send_multipart(['', 'ping'])
if connected_clients_amount <= 0:
print "there are no connected clients!"
class zmqrep(object):
def __init__(self, ident, host, port):
context = zmq.Context()
self.sock = context.socket(zmq.REP)
self.sock.connect("tcp://%s:%s" % (host, port))
self.identity = ident
self.stopped = threading.Event()
threadify(self.pong)
def pong(self):
while not self.stopped.isSet():
request = self.sock.recv()
if request == "ping":
msg = "pong: %s" % self.identity
self.sock.send(msg)
self.sock.close()
def stop(self):
self.stopped.set()
if __name__ == "__main__":
port = 11112
host = "localhost"
num = 5
server = zmqdealer(port)
clients = [zmqrep(i.__str__(), host, port) for i in xrange(num)]
answer = raw_input('press <ENTER> to exit or type \'ping\' to get a pong\n')
while True:
if answer == "":
break
if answer == "ping":
server.ping()
if answer == "kill":
if len(clients) > 0:
die = clients[0]
clients.remove(die)
die.stop()
else:
print "there are no connected clients!\n"
answer = raw_input()
Upvotes: 1
Views: 1579
Reputation: 3175
Router/Dealer sockets are best used for distributing tasks. Say you have 10 tasks and 2 workers, you do not care who does what. Dealer/Router will distribute in a round robin fashion.
Maybe Pub/Sub or Push/Pull sockets would fit your usecase better? They are both broadcast sockets.
Here's an example of Push/Pull used in a similar fashion as what you're doing.
You often end up doing pairs of sockets, one to transmit and one other to receive results. You could for example do a PUSH with a ping message + random identifier, and ask clients to answer on PUB/SUB where you subscribe to this random identifier. This way you can match requests and responses.
Upvotes: 1