Greg
Greg

Reputation: 8945

How to have multiple publishers and subscribers in ZMQ/0MQ?

How to create a network which allows for multiple publishers and multiple subscribers to those publishers?

Or is it absolutely necessary for a message broker to be used?

import time
import zmq
from multiprocessing import Process

def bind_pub(sleep_seconds, max_messages, pub_id):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556")

    message = 0
    while True:
        socket.send_string("1 sending_func=bind_pub message_number=%s pub_id=%s" % (message, pub_id))
        message += 1
        if message >= max_messages:
            break
        time.sleep(sleep_seconds)

def bind_sub(sleep_seconds, max_messages, sub_id):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.bind("tcp://*:5556")
    socket.setsockopt_string(zmq.SUBSCRIBE, '1')

    message_n = 0
    while True:
        message = socket.recv_string()
        print(message + " receiving_func=bind_sub sub_id=%s" % sub_id)
        message_n += 1
        if message_n >= max_messages - 1:
            break
        time.sleep(sleep_seconds)

def conect_pub(sleep_seconds, max_messages, pub_id):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect("tcp://localhost:5556")

    message = 0
    while True:
        socket.send_string("1 sending_func=conect_pub message_number=%s pub_id=%s" % (message, pub_id))
        message += 1
        if message >= max_messages:
            break
        time.sleep(sleep_seconds)

def connect_sub(sleep_seconds, max_messages, sub_id):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5556")
    socket.setsockopt_string(zmq.SUBSCRIBE, '1')

    message_n = 0
    while True:
        message = socket.recv_string()
        print(message + " receiving_func=connect_sub sub_id=%s" % sub_id)
        message_n += 1
        if message_n >= max_messages - 1:
            break
        time.sleep(sleep_seconds)

When trying a bind_pub, connect_pub, connect_sub, connect_sub network architecture:

# bind_pub, connect_pub, connect_sub, connect_sub
n_messages = 4
p1 = Process(target=bind_pub, args=(1,n_messages,1))
p2 = Process(target=conect_pub, args=(1,n_messages,2))
p3 = Process(target=connect_sub, args=(0.1,n_messages,1))
p4 = Process(target=connect_sub, args=(0.1,n_messages,2))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()

Results in pub_id=2 messages going missing:

1 sending_func=bind_pub message_number=1 pub_id=1 receiving_func=connect_sub sub_id=2
1 sending_func=bind_pub message_number=1 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=2 pub_id=1 receiving_func=connect_sub sub_id=2
1 sending_func=bind_pub message_number=2 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=3 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=3 pub_id=1 receiving_func=connect_sub sub_id=2

Similarly running a connect_pub, connect_pub, connect_sub, bind_sub architecture:

# connect_pub, connect_pub, connect_sub, bind_sub
n_messages = 4
p1 = Process(target=conect_pub, args=(1,n_messages,1))
p2 = Process(target=conect_pub, args=(1,n_messages,2))
p3 = Process(target=bind_sub, args=(0.1,n_messages,1))
p4 = Process(target=connect_sub, args=(0.1,n_messages,2))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()

Results in no messages being received by sub_id=2:

1 sending_func=conect_pub message_number=1 pub_id=1 receiving_func=bind_sub sub_id=1
1 sending_func=conect_pub message_number=1 pub_id=2 receiving_func=bind_sub sub_id=1
1 sending_func=conect_pub message_number=2 pub_id=1 receiving_func=bind_sub sub_id=1

Upvotes: 4

Views: 7338

Answers (2)

user3666197
user3666197

Reputation: 1

Well,
fair to mention that ZeroMQ is principally a Broker-less framework ,

this means the 2nd question is solved a priori - no, it is not only not absolutely necessary, it is also principally impossible ( if one does not implement a Broker-(semi-)persistence as a Zen-of-Zero standard ZeroMQ tools based layer an extra add-on ).


Next,
ZeroMQ tools are by far not "socket"-s as you know them :

This is an often re-articulated misconception, so let me repeat it in bold.

Beware:
ZeroMQ Socket()-instance is not a tcp-socket-as-you-know-it. Best read about the main conceptual differences in ZeroMQ hierarchy in less than a five seconds or other posts and discussions here.


Yet,
more important,
there seems to be no expressed need which is not covered :

ZeroMQ can either serve all of :

many-PUB-s : many-SUB-s           -or-  
 one-PUB   : many-SUB-s           -or- even  
many-PUB-s :  one-SUB

where all or part of those "many" could still get .connect()-ed to a single or more AccessPoints, so the produced topologies could go indeed wild ( for details kindly check the above offered link to a "five seconds" read ) so, one's own imagination seems to be the only ceiling in doing this.

Upvotes: 0

WallStProg
WallStProg

Reputation: 401

It is certainly not necessary to use a broker in order to implement a many-to-many network, but a broker does simplify configuration since each node only needs to know the broker's address, not all of its peers.

Another possibility is a hybrid approach -- using a broker to exchange address information among peers so they can connect to each other directly. You can find an example here: https://github.com/nyfix/OZ/blob/master/doc/Naming-Service.md

Upvotes: 1

Related Questions