Guillaume Vincent
Guillaume Vincent

Reputation: 14791

ZeroMQ round robin and workers subscription

I got some clients connecting to a frontend broker and some workers doing some job.

zeromq pattern I use :

zeromq pattern

How can I have a round-robin distribution for my workers AND a worker selection base on event name ?

I used PUB/SUB pattern for the subscription filtering but I don't want my broker to send the same message to workers.

Here some code (python3, zmq):

client.py

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.identity = b'frontend'
socket.connect('tcp://127.0.0.1:4444')

while True:
    event = random.choice([b'CreateUser', b'GetIndex', b'GetIndex', b'GetIndex'])
    socket.send(event)
    print('Emit %s event' % event)
    time.sleep(1)

broker.py

context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.identity = b'broker'
frontend.bind("tcp://127.0.0.1:4444")

backend = context.socket(zmq.DEALER)
backend.identity = b'broker'
backend.bind("tcp://127.0.0.1:5555")

poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)

id = 0
while True:
    id += 1
    sockets = dict(poller.poll())

    if frontend in sockets:
        event, message = frontend.recv_multipart()
        print('Event %s from %s' % (message.decode('utf-8'), event.decode('utf-8')))
        backend.send_multipart([message,str(id).encode('utf-8')])

create_user_worker.py

context = zmq.Context()
worker = context.socket(zmq.DEALER)
worker.identity = b'create-user-worker'
worker.connect("tcp://127.0.0.1:5555")

while True:
    message, id = worker.recv_multipart()
    if message == b'CreateUser':
        print(message, id)

get_index_worker.py

context = zmq.Context()
worker = context.socket(zmq.DEALER)
worker.identity = b'get-index-worker'
worker.connect("tcp://127.0.0.1:5555")

while True:
    message, id = worker.recv_multipart()
    if message == b'GetIndex':
        print(message, id)

The output of the following code:

get_index_worker.py

b'GetIndex' b'1'
b'GetIndex' b'2'
b'GetIndex' b'4'
b'GetIndex' b'6'

create_user_worker.py

b'CreateUser' b'3'

The task for the event with the id 5 is lost

github repo: https://github.com/guillaumevincent/tornado-zeromq

Upvotes: 3

Views: 1267

Answers (1)

user3666197
user3666197

Reputation: 1

Status Quo : As-is

ROUTER/DEALER Device is agnostic to any other logic, than it's internal design dictates ( listen on client side, dispatch any incoming message on a round-robin basis down the line, towards a worker side & keep internal records so as to be able to return answer messages from workers back towards the respective client, nothing more )


How to get more?

Try to imagine another possible approach.

Each client can have more sockets and may get .connect()-ed to more Device-s.

Each Device will receive just the "specialised" type of messages and will handle these appropriately with a standard, round-robin "primitive-load-balancing" Merry-Go-Round behaviour

This way both your design objectives ( distribute messages towards a pool of otherwise load-balanced handlers I. while keeping an event-specific direction principle II. ) are met with still using the most primitive ZeroMQ entities.

Upvotes: 2

Related Questions