ice3
ice3

Reputation: 335

Get data from ZMQ PULL socket. How to sync computation?

I have a producer sending data using PULL / PUSH to multiple workers. All the workers need to receive all their data before performing a computation task.

I tried a sync using a PUB / SUB socket sending a "go" but as the PUSH socket are non-blocking, the go is received before the end of the datastream...


Sender :

context = zmq.Context()
push_socket = self.context.socket(zmq.PUSH)
push_socket.bind("tcp://127.0.0.1:5557")

pull_socket = self.context.socket(zmq.PULL)
pull_socket.bind("tcp://127.0.0.1:5558")

for index, data in range(100): 
    push_socket.send_json({"data": data, "id": index})
pub_socket.send_json({"command": "map"})

Receiver :

# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")

# receive commands
consumer_command = context.socket(zmq.SUB)
consumer_command.subscribe("")
consumer_command.connect("tcp://127.0.0.1:5559")

poller = zmq.Poller()
poller.register(consumer_receiver, zmq.POLLIN)
poller.register(consumer_command, zmq.POLLIN)

while True:
    events = dict(poller.poll(100))
    if consumer_command in events:
        received = consumer_command.recv_json()
        command = received["command"]
        print("received command : ", command)

    if consumer_receiver in events:
        received = consumer_receiver.recv_json()
        print("received data", received)

Receiver output :

received data {'data': ['Hi'], 'id': 0}
received command :  map   
received data {'data': ['hi'], 'id': 1}
...

I would like to have:

received data {'data': ['Hi'], 'id': 0}
received data {'data': ['hi'], 'id': 1}
...
received command :  map   

I tried to set a HWM of 1 for the PUSH socket but it didn't work.

How can I send a synchronization message to all workers after the PUSH is finished ?

Upvotes: 0

Views: 1726

Answers (2)

bazza
bazza

Reputation: 8414

You are seeking to implement a barrier.

ZeroMQ is all about Actor model programming, and one characteristic is that there is no explicit rendevous implied in sending and receiving messages. That is, a send will return regardless of whether or not the other end has read the message.

So this means that a barrier (a type of rendevous) has to be synthesised on top of ZeroMQ's Actor model.

  1. Use a PUSH / PULL socket pair to get the data to the workers.
  2. Use a separate PUSH / PULL socket pair for the workers to send back a "I have the data and am ready to proceed" message to the producer.
  3. Have the producer wait for these "I can proceed" messages,
  4. When it has received one from every worker, send a "go" message on the PUB / SUB socket to the workers.

Communicating Sequential Processes

Simply out of interest you may wish to compare Actor model programming with Communicating Sequential Processes (which in Rust, Erlang, and (I think?) Go is making something of a comeback). In CSP sending / receiving message is a rendevous. This has several benefits;

  • the sender knows that a message has been received and not just queued,
  • it forces one to properly address architecture and resource allocation if one has performance and latency goals. You can't hide messages in transit. So if one had not supplied enough workers the producer would very obviously not be able to offload messages; the deficiency cannot be temporarily hidden by increased latency.
  • if you have managed to construct an architecture that can deadlock, livelock, etc it always will. Whereas an Actor model architecture can appear to be perfectly fine for years until that one day when the network gets a little busier.

To do what you want with CSP, you'd be able to omit steps 2 and 3 above. The Producer would know that every worker had received its data when the send to the last worker returned, and the "go" can be sent out immediately.

Personally speaking I really wish ZeroMQ would have the option to be CSP, not Actor. Then it would be fabulous, instead of being just pretty tremendous. What makes it really good is that it doesn't matter whether it's tcp, ipc, inproc, etc. it all behaves the same (with speed variations obviously).

AFAIK Rust, Erlang and Go CSP channels go no further than the process. ZMQ can be inter and/or intra process and/or inter computer, which makes it highly suitable for developing systems that may outgrow one computer. Need to offload a thread to another computer? Change the connection string, no other code changes required. Very nice.

Upvotes: 3

meteor
meteor

Reputation: 51

You are using separate streams for command and data - this will always guarantee synchronization problems. On recipient side, you will have two stream buffers - first with a lot of data to handle, second with only command and poll() will make sure you are notified that both are ready to be read.

I see two ways to handle this problem:

1) Keep it simple: use only one stream. Everything you send on the end will be received on the end. TCP guarantees that. If you're using json, you can just add to it 'type': 'command' or 'type': 'data' to discriminate between message types.

2) If, for some reason, you really need two streams (e.g. you really want to play with publisher/subscriber pattern), receiver should acknowledge to the sender reception of the last data batch, before sender can send its command. This option would also be the choice if all workers need to receive their data before any of them is started with the command.

Upvotes: 0

Related Questions