Reputation: 151
I have implemented a "network service" superclass in Python, like so:
class NetworkService (threading.Thread):
"""
Implements a multithreaded network service
"""
# Class Globals (ie, C++ class static)
ZmqContext = zmq.Context.instance()
# ==========================================================================================
# Class Mechanics
# ==========================================================================================
def __init__(self, name, port, conc=4):
"""
Network service initilization
"""
self.service_name = name
self.service_port = port
self.concurrency = conc
self.handler_url = "inproc://" + name
self.client_url = "tcp://*:" + str(port)
self.shutdown = True # Cleared in run()
self.thread = {}
super(NetworkService, self).__init__()
# ==========================================================================================
# Class Operation
# ==========================================================================================
def run(self): # Called [only] by threading.Thread.start()
self.shutdown = False
clients = NetworkService.ZmqContext.socket(zmq.ROUTER)
clients.bind(self.client_url)
handlers = NetworkService.ZmqContext.socket(zmq.DEALER)
handlers.bind(self.handler_url)
for i in range(self.concurrency):
self.thread[i] = threading.Thread(target = self.handler, name = self.service_name + str(i))
self.thread[i].daemon = True
self.thread[i].start()
zmq.proxy(clients, handlers)
clients.close()
handlers.close()
def terminate(self):
self.shutdown = True
def handler(self):
socket = NetworkService.ZmqContext.socket(zmq.REP)
socket.connect(self.handler_url)
iam = repr(get_pids()[2])
log.info("nsh@%s is up", iam)
while not self.shutdown:
string = socket.recv()
toe = datetime.utcnow()
command = pickle.loads(string)
reply = self.protocol(command)
string = pickle.dumps(reply)
socket.send(string)
def protocol(self, command): # Override this in subclass
reply = {}
reply["success"] = False
reply["detail"] = "No protocol defined (NetworkService.protocol(...) not overridden)"
if "ident" in command:
reply["ident"] = command["ident"]
return reply
The problem is with the line "zmq.proxy(clients, handlers)
": I can't seem to get it to end. Ever. If all of the handlers terminate, still zmq.proxy()
does not return. I don't mind creating an independent thread to run the proxy, but this is in a daemon that I'd like to be able to shut down cleanly.
I read in the documentation that this is correct behavior for zmq.proxy
, but it doesn't seem so correct to me ;-}.
Can anyone recommend an approximate equivalent that can be shut down once the handler threads have terminated?
Upvotes: 4
Views: 1995
Reputation: 142
The API basically implies that you have to terminate the context. You can run your proxy in a separate thread with a shared context then terminate it and except zmq.ContextTerminated.
try:
zmq.proxy(self.frontend, self.backend)
except zmq.ContextTerminated:
# cleanup if needed
Upvotes: 3