Reputation: 7211
I started using zeromq
with python with the Publisher/Subscriber reference. However, I don't find any documentation about how to treat messages in the queue. I want to treat the last received message different as the rest of the elements of the queue.
import zmq
import random
import time
port = "5556"
topic = "1"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
while True:
messagedata = random.randrange(1,215)
print "%s %d" % (topic, messagedata)
socket.send("%s %d" % (topic, messagedata))
time.sleep(.2)
import zmq
port = "5556"
topic = "1"
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Connecting..."
socket.connect ("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,topic)
while True:
if isLastMessage(): # probably based on socket.recv()
analysis_function() # time consuming function
else:
simple_function() # something simple like print and save in memory
I just want to know how to create the isLastMessage()
function described in the subscriber.py
file. If there's something directly in zeromq
or a workaround.
Upvotes: 1
Views: 1855
Reputation: 7211
Sorry, I will keep the question for reference. I just found the answer, in the documentation there is a NOBLOCK
flag that you can add to the receiver. With this the recv
command doesn't block. A simple workaround, extracted from a part of an answer, is the following:
while True:
try:
#check for a message, this will not block
message = socket.recv(flags=zmq.NOBLOCK)
#a message has been received
print "Message received:", message
except zmq.Again as e:
print "No message received yet"
As for the real implementation, one is not sure that it is the last call you use the flag NOBLOCK
and once you have entered the exception
block. Wich translates to something like the following:
msg = subscribe(in_socket)
is_last = False
while True:
if is_last:
msg = subscribe(in_socket)
is_last = False
else:
try:
old_msg = msg
msg = subscribe(in_socket,flags=zmq.NOBLOCK)
# if new message was received, then process the old message
process_not_last(old_msg)
except zmq.Again as e:
process_last(msg)
is_last = True # it is probably the last message
Upvotes: 0
Reputation: 1
this is a cardinal feature for any serious distributed-system design.
If you assume a "last" message via a not having another one in the pipe, then a Poller()
instance may help your main event-loops, where you may control the amount of time to "wait"-a-bit before considering the pipe "empty", not to devastate your IO-resources with zero-wait spinning-loops.
There is Zero-knowledge on the receiver-side, what is the context of the "last"-message received ( and explicit signalling is advised to be rather broadcast from the message sender-side ), however there is a reversed feature to this -- that instructs ZeroMQ archetypes to "internally"-throw away all such messages, that are not the "last"-message, thus reducing the receiver-side processing to right the "last"-message available.
aQuoteStreamMESSAGE.setsockopt( zmq.CONFLATE, 1 )
If you may like to read more on ZeroMQ patterns and anti-patterns, do not miss Pieter HINTJENS' fabulous book "Code Connected, Volume 1" ( also in pdf ) and may like a broader view on distributed-computing using principally a non-blocking ZeroMQ approach
Upvotes: 2
Reputation: 26823
If isLastMessage()
is meant to identify the last message within the stream of messages produced by publisher.py
, than this is impossible since there is no last message. publisher.py
produces an infinite amount of messages!
However, if publisher.py
knows its last "real" message, i.e. no while True:
, it could send a "I am done" message afterwards. Identifying that in subscriber.py
is trivial.
Upvotes: 1