Tushar Tarkas
Tushar Tarkas

Reputation: 1612

ZMQ: Message gets lost in Dealer Router Dealer pattern implementation

I have a working setup where multiple clients send messages to multiple servers. Each message target only one server. The client knows the ids of all possible servers and only sends the messages if such server is actually connected. Each server on startup connects to the socked. There are multiple server workers which bind to inproc router socket. The communication is initiated from client always. The messages are sent asynchronously to each server.

This is achieved using DEALER->ROUTER->DEALER pattern. My problem is that when the number of client & server workers increase, the "ack" sent by server to client (Step # 7 below) is never delivered to client. Thus, the client is stuck waiting for acknowledgement whereas the server is waiting for more messages from client. Both the systems hang and never come out of this condition unless restarted. Details of configuration and communication flow are mentioned below.

I've checked system logs and nothing evident is coming out of it. Any help or guidance to triage this further will be helpful.

At startup, the client connects to the socket to its IP: Port, as a dealer. "requester, _ := zmq.NewSocket(zmq.DEALER)". The dealers connect to Broker. The broker connects frontend (client workers) to backend (server workers). Frontend is bound to TCP socket while the backend is bound as inproc.

// Frontend dealer workers frontend, _ := zmq.NewSocket(zmq.DEALER) defer frontend.Close()

// For workers local to the broker backend, _ := zmq.NewSocket(zmq.DEALER) defer backend.Close()

// Frontend should always use TCP frontend.Bind("tcp://*:5559")

// Backend should always use inproc backend.Bind("inproc://backend")

// Initialize Broker to transfer messages poller := zmq.NewPoller() poller.Add(frontend, zmq.POLLIN) poller.Add(backend, zmq.POLLIN)

// Switching messages between sockets for { sockets, _ := poller.Poll(-1) for _, socket := range sockets { switch s := socket.Socket; s { case frontend: for { msg, _ := s.RecvMessage(0) workerID := findWorker(msg[0]) // Get server workerID from message for which it is intended log.Println("Forwarding Message:", msg[1], "From Client: ", msg[0], "To Worker: ") if more, _ := s.GetRcvmore(); more { backend.SendMessage(workerID, msg, zmq.SNDMORE) } else { backend.SendMessage(workerID, msg) break } } case backend: for { msg, _ := s.RecvMessage(0) // Register new workers as they come and go fmt.Println("Message from backend worker: ", msg) clientID := findClient(msg[0]) // Get client workerID from message for which it is intended log.Println("Returning Message:", msg[1], "From Worker: ", msg[0], "To Client: ", clientID) frontend.SendMessage(clientID, msg, zmq.SNDMORE) } } } }

Once the connection is established,

  1. The client sends a set of messages on frontend socket. The messages contain metadata about the all the messages to be followed requester.SendMessage(msg)

  2. Once these messages are sent, then client waits for acknowledgement from the server reply, _ := requester.RecvMessage(0)

  3. The router transfers these messages from frontend to backend workers based on logic defined above

  4. The backend dealers process these messages & respond back over backend socket asking for more messages

  5. The Broker then transfers message from backend inproc to frontend socket

  6. The client processes this message and sends required messsages to the server. The messages are sent as a group (batch) asynchronously

  7. Server receives and processes all of the messages sent by client

  8. After processing all the messages, the server sends an "ack" back to the client to confirm all the messages are received

  9. Once all the messages are sent by client and processed by server, the server sends a final message indicating all the transfer is complete.

  10. The communication ends here

This works great when there is a limited set of workers and messages transferred. The implementation has multiple dealers (clients) sending message to a router. Router in turn sends these messages to another set of dealers (servers) which process the respective messages. Each message contains the Client & Server Worker IDs for identification.

We have configured following limits for the send & receive queues.

  1. Broker HWM: 10000
  2. Dealer HWM: 1000
  3. Broker Linger Limit: 0

Some more findings:

  1. This issue is prominent when server processing (step 7 above) takes more than 10 minutes of time.
  2. The client and server are running in different machines both are Ubuntu-20LTS with ZMQ version 4.3.2

Environment

Upvotes: 0

Views: 417

Answers (1)

Tushar Tarkas
Tushar Tarkas

Reputation: 1612

Eventually, it turned out to be configuring Heartbeat for zmq sockets. Referred documentation here http://api.zeromq.org/4-2:zmq-setsockopt Configured following parameters

  1. ZMQ_HANDSHAKE_IVL: Set maximum handshake interval
  2. ZMQ_HEARTBEAT_IVL: Set interval between sending ZMTP heartbeats
  3. ZMQ_HEARTBEAT_TIMEOUT: Set timeout for ZMTP heartbeats

Configure the above parameters appropriately to ensure that there is a constant check between the client and server dealers. Thus even if one is delayed processing, the other one doesn't timeout abruptly.

Upvotes: 1

Related Questions