Reputation: 8945
How to create a network which allows for multiple publishers and multiple subscribers to those publishers?
Or is it absolutely necessary for a message broker to be used?
import time
import zmq
from multiprocessing import Process
def bind_pub(sleep_seconds, max_messages, pub_id):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
message = 0
while True:
socket.send_string("1 sending_func=bind_pub message_number=%s pub_id=%s" % (message, pub_id))
message += 1
if message >= max_messages:
break
time.sleep(sleep_seconds)
def bind_sub(sleep_seconds, max_messages, sub_id):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, '1')
message_n = 0
while True:
message = socket.recv_string()
print(message + " receiving_func=bind_sub sub_id=%s" % sub_id)
message_n += 1
if message_n >= max_messages - 1:
break
time.sleep(sleep_seconds)
def conect_pub(sleep_seconds, max_messages, pub_id):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:5556")
message = 0
while True:
socket.send_string("1 sending_func=conect_pub message_number=%s pub_id=%s" % (message, pub_id))
message += 1
if message >= max_messages:
break
time.sleep(sleep_seconds)
def connect_sub(sleep_seconds, max_messages, sub_id):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, '1')
message_n = 0
while True:
message = socket.recv_string()
print(message + " receiving_func=connect_sub sub_id=%s" % sub_id)
message_n += 1
if message_n >= max_messages - 1:
break
time.sleep(sleep_seconds)
When trying a bind_pub, connect_pub, connect_sub, connect_sub network architecture:
# bind_pub, connect_pub, connect_sub, connect_sub
n_messages = 4
p1 = Process(target=bind_pub, args=(1,n_messages,1))
p2 = Process(target=conect_pub, args=(1,n_messages,2))
p3 = Process(target=connect_sub, args=(0.1,n_messages,1))
p4 = Process(target=connect_sub, args=(0.1,n_messages,2))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
Results in pub_id=2
messages going missing:
1 sending_func=bind_pub message_number=1 pub_id=1 receiving_func=connect_sub sub_id=2
1 sending_func=bind_pub message_number=1 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=2 pub_id=1 receiving_func=connect_sub sub_id=2
1 sending_func=bind_pub message_number=2 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=3 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=3 pub_id=1 receiving_func=connect_sub sub_id=2
Similarly running a connect_pub, connect_pub, connect_sub, bind_sub architecture:
# connect_pub, connect_pub, connect_sub, bind_sub
n_messages = 4
p1 = Process(target=conect_pub, args=(1,n_messages,1))
p2 = Process(target=conect_pub, args=(1,n_messages,2))
p3 = Process(target=bind_sub, args=(0.1,n_messages,1))
p4 = Process(target=connect_sub, args=(0.1,n_messages,2))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
Results in no messages being received by sub_id=2
:
1 sending_func=conect_pub message_number=1 pub_id=1 receiving_func=bind_sub sub_id=1
1 sending_func=conect_pub message_number=1 pub_id=2 receiving_func=bind_sub sub_id=1
1 sending_func=conect_pub message_number=2 pub_id=1 receiving_func=bind_sub sub_id=1
Upvotes: 4
Views: 7338
Reputation: 1
this means the 2nd question is solved a priori - no, it is not only not absolutely necessary, it is also principally impossible ( if one does not implement a Broker-(semi-)persistence as a Zen-of-Zero standard ZeroMQ tools based layer an extra add-on ).
This is an often re-articulated misconception, so let me repeat it in bold.
Beware:
ZeroMQ Socket()
-instance is not a tcp-socket-as-you-know-it. Best read about the main conceptual differences in ZeroMQ hierarchy in less than a five seconds or other posts and discussions here.
ZeroMQ can either serve all of :
many-PUB-s : many-SUB-s -or-
one-PUB : many-SUB-s -or- even
many-PUB-s : one-SUB
where all or part of those "many" could still get .connect()
-ed to a single or more AccessPoints, so the produced topologies could go indeed wild ( for details kindly check the above offered link to a "five seconds" read ) so, one's own imagination seems to be the only ceiling in doing this.
Upvotes: 0
Reputation: 401
It is certainly not necessary to use a broker in order to implement a many-to-many network, but a broker does simplify configuration since each node only needs to know the broker's address, not all of its peers.
Another possibility is a hybrid approach -- using a broker to exchange address information among peers so they can connect to each other directly. You can find an example here: https://github.com/nyfix/OZ/blob/master/doc/Naming-Service.md
Upvotes: 1