covariantmonkey
covariantmonkey

Reputation: 223

How to synchronize data between multiple workers

I've the following problem that is begging a zmq solution. I have a time-series data:

A,B,C,D,E,...

I need to perform an operation, Func, on each point.

It makes good sense to parallelize the task using multiple workers via zmq. However, what is tripping me up is how do I synchronize the result, i.e., the results should be time-ordered exactly the way the input data came in. So the end result should look like:

Func(A), Func(B), Func(C), Func(D),...

I should also point out that time to complete,say, Func(A) will be slightly different than Func(B). This may require me to block for a while.

Any suggestions would be greatly appreciated.

Upvotes: 0

Views: 851

Answers (1)

Tisho
Tisho

Reputation: 8482

You will always need to block for a while in order to synchronize things. You can actually send requests to a pool of workers, and when a response is received - to buffer it if it is not a subsequent one. One simple workflow could be described in a pseudo-language as follows:

socket receiver; # zmq.PULL
socket workers; # zmq.DEALER, the worker thread socket is started as zmq.DEALER too.
poller = poller(receiver, workers);

next_id_req = incr()
out_queue = queue;
out_queue.last_id = next_id_req
buffer = sorted_queue;

sock = poller.poll()
if sock is receiver:
    packet_N = receiver.recv()
    # send N for processing
    worker.send(packet_N, ++next_id_req)

else if sock is workers:
   # get a processed response Func(N)
   func_N_response, id = workers.recv()
   if out_queue.last_id != id-1:
       # not subsequent id, buffer it
       buffer.push(id, func_N_rseponse)
   else:
       # in order, push to out queue
       out_queue.push(id, func_N_response)

       # also consume all buffered subsequent items
       while (out_queue.last_id == buffer.min_id() - 1):
           id, buffered_N_resp = buffer.pop()
           out_queue.push(id, buffered_N_resp)

But here comes the problem what happens if a packet is lost in the processing thread(the workers pool).. You can either skip it after a certain timeout(flush the buffer into the out queue), amd continue filling the out queue, and reorder when the packet comes later, if ever comes.

Upvotes: 1

Related Questions