silgon
silgon

Reputation: 7211

ZeroMQ Pub/Sub action last element in queue an other elements

I started using zeromq with python with the Publisher/Subscriber reference. However, I don't find any documentation about how to treat messages in the queue. I want to treat the last received message different as the rest of the elements of the queue.

Example

publisher.py

import zmq
import random
import time

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
    messagedata = random.randrange(1,215)
    print "%s %d" % (topic, messagedata)
    socket.send("%s %d" % (topic, messagedata))
    time.sleep(.2)

subscriber.py

import zmq

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Connecting..."
socket.connect ("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,topic)

while True:
    if isLastMessage(): # probably based on socket.recv()
         analysis_function() # time consuming function
    else:
         simple_function()  # something simple like print and save in memory

I just want to know how to create the isLastMessage() function described in the subscriber.py file. If there's something directly in zeromq or a workaround.

Upvotes: 1

Views: 1855

Answers (3)

silgon
silgon

Reputation: 7211

Sorry, I will keep the question for reference. I just found the answer, in the documentation there is a NOBLOCK flag that you can add to the receiver. With this the recv command doesn't block. A simple workaround, extracted from a part of an answer, is the following:

while True:
    try:
        #check for a message, this will not block
        message = socket.recv(flags=zmq.NOBLOCK)

        #a message has been received
        print "Message received:", message

    except zmq.Again as e:
        print "No message received yet"

As for the real implementation, one is not sure that it is the last call you use the flag NOBLOCK and once you have entered the exception block. Wich translates to something like the following:

msg = subscribe(in_socket)
is_last = False
while True:
    if is_last:
        msg = subscribe(in_socket)
        is_last = False
    else:
        try:
            old_msg = msg
            msg = subscribe(in_socket,flags=zmq.NOBLOCK)
            # if new message was received, then process the old message
            process_not_last(old_msg)
        except zmq.Again as e:
            process_last(msg)
            is_last = True  # it is probably the last message

Upvotes: 0

user3666197
user3666197

Reputation: 1

Welcome to the world of non-blocking messaging / signalling

this is a cardinal feature for any serious distributed-system design.

If you assume a "last" message via a not having another one in the pipe, then a Poller() instance may help your main event-loops, where you may control the amount of time to "wait"-a-bit before considering the pipe "empty", not to devastate your IO-resources with zero-wait spinning-loops.

Explicit signalling is always better ( if you can design the remote end behaviour )

There is Zero-knowledge on the receiver-side, what is the context of the "last"-message received ( and explicit signalling is advised to be rather broadcast from the message sender-side ), however there is a reversed feature to this -- that instructs ZeroMQ archetypes to "internally"-throw away all such messages, that are not the "last"-message, thus reducing the receiver-side processing to right the "last"-message available.

aQuoteStreamMESSAGE.setsockopt( zmq.CONFLATE, 1 )

If you may like to read more on ZeroMQ patterns and anti-patterns, do not miss Pieter HINTJENS' fabulous book "Code Connected, Volume 1" ( also in pdf ) and may like a broader view on using principally a non-blocking ZeroMQ approach

Upvotes: 2

Ralf Stubner
Ralf Stubner

Reputation: 26823

If isLastMessage() is meant to identify the last message within the stream of messages produced by publisher.py, than this is impossible since there is no last message. publisher.py produces an infinite amount of messages!

However, if publisher.py knows its last "real" message, i.e. no while True:, it could send a "I am done" message afterwards. Identifying that in subscriber.py is trivial.

Upvotes: 1

Related Questions