Ian
Ian

Reputation: 21

Why ZeroMQ SUBs are missing messages?

I built about 12000 subscribers per computer with threading as following

subscriber side:

def client(id):

    context=zmq.Context()
    subscriber=context.socket(zmq.SUB)
    subscriber.connect('ip:port')
    subscriber.setsockopt(zmq.SUBSCRIBE,(id+'e').encode())

    while 1:
        signal=subscriber.recv_multipart()
    
        write logs...
    

for i in range(12000):
    threading.Thread(target=client,args=(str(i+j*12000),)).start()

#j is arbitrary unduplicated int

publisher side:

subscriber=zmq.Context().socket(zmq.PUB)
subscriber.bind('tcp://*:port')
while 1:
    for id in client_id:
        subscriber.send_multipart([(id+'e').encode()]+[message])

When I used more than one computer(by using different j) to build subscribers, sometimes some subscribers could not receive message at all.

If I restart subscribers, those who could not receive message become normal. But those who were normal become to unable to receive message.

These problem will not show any errors, only can be found in my logs.

Do excessive connection occur this problem?

Upvotes: 1

Views: 1097

Answers (2)

user3666197
user3666197

Reputation: 1

As the counts of connections / messages / sizes grow larger and larger, some default guesstimates typically cease to suffice. Try to extend some otherwise working defaults on the PUB-side configuration, where the problem seems to start choking ( do not forget that since v3.?+ the subscription-list processing got transferred from the SUB-side(s) to the central PUB-side. That reduces the volumes of data-flow, yet at some (here growing to remarkable amounts) additional costs on the PUB-side ~ RAM-for-buffers + CPU-for-TOPIC-list-filtering...

So, let's start with these steps on the PUB-side :

aSock2SUBs = zmq.Context( _tweak_nIOthreads ).socket( zmq.PUB ) # MORE CPU POWER
aSock2SUBs.setsockopt( zmq.SNDBUF, _tweak_SIZE_with_SO_SNDBUF ) # ROOM IN SNDBUF

And last but not least, PUB-s do silently drop any messages, that do not "fit" under its current HighWaterMark level, so let's tweak this one too :

aSock2SUBs.setsockopt( zmq.SNDHWM, _tweak_HWM_till_no_DROPs )   # TILL NO DROPS

Other { TCP_* | TOS | RECONNECT_IVL* | BACKLOG | IMMEDIATE | HEARTBEAT_* | ... }-low-level parameter settings may help further to make your herd of 12k+ SUB-s live in peace side by side with other (both friendly & hostile ) traffic and make your application more robust, than if relying just on pre-cooked API-defaults.

Consult both the ZeroMQ API documentation altogether also with the O/S defaults, as many of these ZeroMQ low-level attributes also rely on the O/S actual configuration values.

You shall also be warned, that making 12k+ threads in Python still leaves a purely [SERIAL] code execution, as the Python central GIL-lock ownership (exclusive) avoids (yes, principally avoids) any form of [CONCURRENT] co-execution, as the very ownership of the GIL-lock is exclusive and re-[SERIAL]-ises any amount of threads into a waiting queue and results in a plain sequence of chunks' execution ( By default, Python 2 will switch threads every 100 instructions. Since Python 3.2+, by default, the GIL will be released after 5 milliseconds ( 5,000 [us] ) so that other thread can have a chance to try & also acquire the GIL-lock. You can change these defaults, if the war of 12k+ threads on swapping the ownership of the GIL-lock actually results in "almost-blocking" any and all of the TCP/IP-instrumentation for message buffering, stacking, sending, re-transmit-ing until an in time confirmed reception. One may test it until a bleeding edge, yet choosing some safer ceiling might help if other parameters have been well adjusted for robustness.


Last but not least, enjoy the Zen-of-Zero, the masterpiece of Martin SUSTRIK for distributed-computing, so well crafted for ultimately scalable, almost zero-latency, very comfortable, widely ported signalling & messaging framework.

Upvotes: 1

bazza
bazza

Reputation: 8394

Further to user3666197's answer, you may also have to consider the time taken for all of those clients to connect. The PUBlisher has no idea how many SUBcribers there are supposed to be, and will simply get on with the job of sending out messages to those SUBscribers presently connected, from when the very first connection is made. The PUBlisher socket does not hang on to messages its sent just in case more SUBscribers connect at some undefined time in the future. Once a message has been transferred to 1 or more SUBscribers, it's dropped from the PUBlisher's queue. Also, the connections are not made instantaneously, and 12,000 is quite a few to get through.

It doesn't matter if you start your PUBlisher or SUBscriber program first; your 12,000 connections will be being made over a period of time once both programs are running, this happening asynchronously wrt to your own thread(s). Some SUBscribers will start getting messages whilst others will still be unknown to the PUBlisher. When, finally, all 12,000 connections are made then it will smooth out.

Upvotes: 0

Related Questions