reptilicus
reptilicus

Reputation: 10397

ZeroMQ: load balance many workers and one master

Suppose I have one master process that divides up data to be processed in parallel. Lets say there are 1000 chunks of data and 100 nodes on which to run the computations.

Is there some way to do REQ/REP to keep all the workers busy? I've tried to use the load balancer pattern in the guide but with a single client, sock.recv() is going to block until it receives its response from the worker.

Here is the code, slightly modified from the zmq guide for a load balancer. Is starts up one client, 10 workers, and a load balancer/broker in the middle. How can I get all those workers working at the same time???

from __future__ import print_function
from multiprocessing import Process
import zmq
import time
import uuid
import random

def client_task():
    """Basic request-reply client using REQ socket."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = str(uuid.uuid4())
    socket.connect("ipc://frontend.ipc")
    # Send request, get reply
    for i in range(100):
        print("SENDING: ", i)
        socket.send('WORK')
        msg = socket.recv()
        print(msg)

def worker_task():
    """Worker task, using a REQ socket to do load-balancing."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = str(uuid.uuid4())
    socket.connect("ipc://backend.ipc")
    # Tell broker we're ready for work
    socket.send(b"READY")
    while True:
        address, empty, request = socket.recv_multipart()
        time.sleep(random.randint(1, 4))
        socket.send_multipart([address, b"", b"OK : " + str(socket.identity)])


def broker():
    context = zmq.Context()
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("ipc://frontend.ipc")
    backend = context.socket(zmq.ROUTER)
    backend.bind("ipc://backend.ipc")
    # Initialize main loop state
    workers = []
    poller = zmq.Poller()
    # Only poll for requests from backend until workers are available
    poller.register(backend, zmq.POLLIN)

    while True:
        sockets = dict(poller.poll())
        if backend in sockets:
            # Handle worker activity on the backend
            request = backend.recv_multipart()
            worker, empty, client = request[:3]
            if not workers:
                # Poll for clients now that a worker is available
                poller.register(frontend, zmq.POLLIN)
            workers.append(worker)
            if client != b"READY" and len(request) > 3:
                # If client reply, send rest back to frontend
                empty, reply = request[3:]
                frontend.send_multipart([client, b"", reply])

        if frontend in sockets:
            # Get next client request, route to last-used worker
            client, empty, request = frontend.recv_multipart()
            worker = workers.pop(0)
            backend.send_multipart([worker, b"", client, b"", request])
            if not workers:
                # Don't poll clients if no workers are available
                poller.unregister(frontend)

    # Clean up
    backend.close()
    frontend.close()
    context.term()

def main():
    NUM_CLIENTS = 1
    NUM_WORKERS = 10
    # Start background tasks
    def start(task, *args):
        process = Process(target=task, args=args)
        process.start()
    start(broker)

    for i in range(NUM_CLIENTS):
        start(client_task)

    for i in range(NUM_WORKERS):
        start(worker_task)


    # Process(target=broker).start()




if __name__ == "__main__":
    main()

Upvotes: 5

Views: 975

Answers (1)

mgc
mgc

Reputation: 5443

I guess there is different ways to do this :

-you can, for example, use the threading module to launch all your requests from your single client, with something like:

result_list = []  # Add the result to a list for the example 
rlock = threading.RLock()

def client_thread(client_url, request, i):
    context = zmq.Context.instance()
    socket = context.socket(zmq.REQ)

    socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
    socket.connect(client_url)

    socket.send(request.encode())
    reply = socket.recv()

    with rlock:
        result_list.append((i, reply))
    return

def client_task():
    # tasks = list with all your tasks
    url_client = "ipc://frontend.ipc"
    threads = []
    for i in range(len(tasks)):
        thread = threading.Thread(target=client_thread,
                                    args=(url_client, tasks[i], i,))
        thread.start()
        threads.append(thread)

-you can take benefit of an evented library like asyncio (there is a submodule zmq.asyncio and an other library aiozmq, the last one offers a higher level of abstraction). In this case you will send your requests to the workers, sequentially too, but without blocking for each response (and so not keeping the main loop busy) and get the results when they came back to the main loop. This could look like this:

import asyncio
import zmq.asyncio

async def client_async(request, context, i, client_url):
    """Basic client sending a request (REQ) to a ROUTER (the broker)"""
    socket = context.socket(zmq.REQ)
    socket.setsockopt_string(zmq.IDENTITY, '{}'.format(i))
    socket.connect(client_url)
    await socket.send(request.encode())
    reply = await socket.recv()
    socket.close()
    return reply


async def run(loop):
    # tasks = list full of tasks
    url_client = "ipc://frontend.ipc"
    asyncio_tasks = []
    ctx = zmq.asyncio.Context()
    for i in range(len(tasks)):
        task = asyncio.ensure_future(client_async(tasks[i], ctx, i, url_client))
        asyncio_tasks.append(task)

    responses = await asyncio.gather(*asyncio_tasks)
    return responses

zmq.asyncio.install()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(run(loop))

I didn't tested theses two snippets but they are both coming (with modifications to fit the question) from code i have using zmq in a similar configuration than your question.

Upvotes: 1

Related Questions