Reputation: 335
I have a producer sending data using PULL / PUSH to multiple workers. All the workers need to receive all their data before performing a computation task.
I tried a sync using a PUB / SUB socket sending a "go" but as the PUSH socket are non-blocking, the go is received before the end of the datastream...
Sender :
context = zmq.Context()
push_socket = self.context.socket(zmq.PUSH)
push_socket.bind("tcp://127.0.0.1:5557")
pull_socket = self.context.socket(zmq.PULL)
pull_socket.bind("tcp://127.0.0.1:5558")
for index, data in range(100):
push_socket.send_json({"data": data, "id": index})
pub_socket.send_json({"command": "map"})
Receiver :
# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
# receive commands
consumer_command = context.socket(zmq.SUB)
consumer_command.subscribe("")
consumer_command.connect("tcp://127.0.0.1:5559")
poller = zmq.Poller()
poller.register(consumer_receiver, zmq.POLLIN)
poller.register(consumer_command, zmq.POLLIN)
while True:
events = dict(poller.poll(100))
if consumer_command in events:
received = consumer_command.recv_json()
command = received["command"]
print("received command : ", command)
if consumer_receiver in events:
received = consumer_receiver.recv_json()
print("received data", received)
Receiver output :
received data {'data': ['Hi'], 'id': 0}
received command : map
received data {'data': ['hi'], 'id': 1}
...
I would like to have:
received data {'data': ['Hi'], 'id': 0}
received data {'data': ['hi'], 'id': 1}
...
received command : map
I tried to set a HWM of 1 for the PUSH socket but it didn't work.
How can I send a synchronization message to all workers after the PUSH is finished ?
Upvotes: 0
Views: 1726
Reputation: 8414
You are seeking to implement a barrier.
ZeroMQ is all about Actor model programming, and one characteristic is that there is no explicit rendevous implied in sending and receiving messages. That is, a send will return regardless of whether or not the other end has read the message.
So this means that a barrier (a type of rendevous) has to be synthesised on top of ZeroMQ's Actor model.
Communicating Sequential Processes
Simply out of interest you may wish to compare Actor model programming with Communicating Sequential Processes (which in Rust, Erlang, and (I think?) Go is making something of a comeback). In CSP sending / receiving message is a rendevous. This has several benefits;
To do what you want with CSP, you'd be able to omit steps 2 and 3 above. The Producer would know that every worker had received its data when the send to the last worker returned, and the "go" can be sent out immediately.
Personally speaking I really wish ZeroMQ would have the option to be CSP, not Actor. Then it would be fabulous, instead of being just pretty tremendous. What makes it really good is that it doesn't matter whether it's tcp, ipc, inproc, etc. it all behaves the same (with speed variations obviously).
AFAIK Rust, Erlang and Go CSP channels go no further than the process. ZMQ can be inter and/or intra process and/or inter computer, which makes it highly suitable for developing systems that may outgrow one computer. Need to offload a thread to another computer? Change the connection string, no other code changes required. Very nice.
Upvotes: 3
Reputation: 51
You are using separate streams for command and data - this will always guarantee synchronization problems. On recipient side, you will have two stream buffers - first with a lot of data to handle, second with only command and poll() will make sure you are notified that both are ready to be read.
I see two ways to handle this problem:
1) Keep it simple: use only one stream. Everything you send on the end will be received on the end. TCP guarantees that. If you're using json, you can just add to it 'type': 'command' or 'type': 'data' to discriminate between message types.
2) If, for some reason, you really need two streams (e.g. you really want to play with publisher/subscriber pattern), receiver should acknowledge to the sender reception of the last data batch, before sender can send its command. This option would also be the choice if all workers need to receive their data before any of them is started with the command.
Upvotes: 0