Don Doerner
Don Doerner

Reputation: 151

How to shut down ZeroMQ zmq.proxy in Python?

I have implemented a "network service" superclass in Python, like so:

class NetworkService (threading.Thread):

    """
        Implements a multithreaded network service
    """

    # Class Globals (ie, C++ class static)
    ZmqContext = zmq.Context.instance()

    # ==========================================================================================
    # Class Mechanics
    # ==========================================================================================

    def __init__(self, name, port, conc=4):
        """
            Network service initilization
        """
        self.service_name = name
        self.service_port = port
        self.concurrency = conc
        self.handler_url = "inproc://" + name
        self.client_url = "tcp://*:" + str(port)
        self.shutdown = True # Cleared in run()
        self.thread = {}
        super(NetworkService, self).__init__()

    # ==========================================================================================
    # Class Operation
    # ==========================================================================================

    def run(self): # Called [only] by threading.Thread.start()
        self.shutdown = False

        clients = NetworkService.ZmqContext.socket(zmq.ROUTER)
        clients.bind(self.client_url)

        handlers = NetworkService.ZmqContext.socket(zmq.DEALER)
        handlers.bind(self.handler_url)

        for i in range(self.concurrency):
            self.thread[i] = threading.Thread(target = self.handler, name = self.service_name + str(i))
            self.thread[i].daemon = True
            self.thread[i].start()

        zmq.proxy(clients, handlers)
        clients.close()
        handlers.close()

    def terminate(self):
        self.shutdown = True

    def handler(self):
        socket = NetworkService.ZmqContext.socket(zmq.REP)
        socket.connect(self.handler_url)
        iam = repr(get_pids()[2])
        log.info("nsh@%s is up", iam)
        while not self.shutdown:
            string = socket.recv()
            toe = datetime.utcnow()
            command = pickle.loads(string)
            reply = self.protocol(command)
            string = pickle.dumps(reply)
            socket.send(string)

    def protocol(self, command): # Override this in subclass
        reply = {}
        reply["success"] = False
        reply["detail"] = "No protocol defined (NetworkService.protocol(...) not overridden)"
        if "ident" in command:
            reply["ident"] = command["ident"]
        return reply

The problem is with the line "zmq.proxy(clients, handlers)": I can't seem to get it to end. Ever. If all of the handlers terminate, still zmq.proxy() does not return. I don't mind creating an independent thread to run the proxy, but this is in a daemon that I'd like to be able to shut down cleanly.

I read in the documentation that this is correct behavior for zmq.proxy, but it doesn't seem so correct to me ;-}.

Can anyone recommend an approximate equivalent that can be shut down once the handler threads have terminated?

Upvotes: 4

Views: 1995

Answers (1)

Alex M
Alex M

Reputation: 142

The API basically implies that you have to terminate the context. You can run your proxy in a separate thread with a shared context then terminate it and except zmq.ContextTerminated.

try:
    zmq.proxy(self.frontend, self.backend)
except zmq.ContextTerminated:
    # cleanup if needed

Upvotes: 3

Related Questions