Ivan Gromov
Ivan Gromov

Reputation: 4415

0mq one-to-many connection

What is the most correct way to establish a two-way communication between processes using 0mq? I need to create several background processes that will wait for commands from the main process, perform some calculations and return the result back to the main process.

Upvotes: 4

Views: 4219

Answers (2)

Zach Kelling
Zach Kelling

Reputation: 53819

There are a few ways to do this. The most straight forward approach might be to use REQ/REP sockets. Each background process/worker would have a REP socket, and you would use a REQ socket to communicate with them:

import zmq

def worker(addr):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(addr)
    while True:
        # get message from boss
        msg = socket.recv()
        # ...do smth
        # send back results
        socket.send(msg)

if __name__ == '__main__':
    # spawn 5 workers
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start()

You'd have to connect to each worker to send them a message, and get back results:

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(worker_addr)
socket.send('message')
msg = socket.recv()

Another approach would be to use PUB/SUB to fire off messages to the workers and PUSH/PULL to harvest results:

import zmq

def worker(worker_id, publisher_addr, results_addr):
    context = zmq.Context()
    sub = context.socket(zmq.SUB)
    sub.connect(publisher_addr)
    sub.setsockopt(zmq.SUBSCRIBE, worker_id)
    push = context.socket(zmq.PUSH)
    push.connect(results_addr)

    while True:
        msg = sub.recv_multipart()[1]
        # do smth, send off results
        push.send_multipart([worker_id, msg])

if __name__ == '__main__':
    publisher_addr = 'tcp://127.0.0.1:5000'
    results_addr = 'tcp://127.0.0.1:5001'

    # launch some workers into space
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start()

To broadcast a command to a specific worker, you'd do something like:

context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind(publisher_addr)
# send message to worker-1
pub.send_multipart(['worker-1', 'hello'])

Pull in results:

context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind(results_addr)

while True:
    worker_id, result = pull.recv_multipart()
    print worker_id, result

Upvotes: 7

Michal Sznajder
Michal Sznajder

Reputation: 9406

Consider using Request Reply Broker but exchange REQ socket into DEALER. DEALER is non blocking for sending and will automatically load balance traffic towards your workers.

In picture Client would be your main process and Service A/B/C are your background processes (workers). Main process should bind to an endpoint. Workers should connect to main process's endpoint to receive work items.

In main process keep list of work items and send time. If there is no answer for some time just resend work item again since worker probably died.

Request Reply Broker

Upvotes: 3

Related Questions