Reputation: 3869
So redis 5.0 freshly introduced a new feature called Streams. They seem to be perfect for distributing messages for inter process communication:
However, since this feature is quite new, there are barely any Python (or even general redis) manuals out there and I don't really get how to adapt the stream system to my use case.
I want to have one publisher program that pushes messages to the stream and contains a recipient information (like recipient: "user1"
). Then I will have several receiving processes that all should check for new stream messages and compare if they are the targeted recipient. If they are, they should process the message and mark it as processed (acknowledged).
However, I don't really get the idea of consumer groups, pending state and so on. Can anybody give me a real world example for my little pseudo-code?
sender.py
db = Redis(...)
db.the_stream.add({"recipient": "user1", "task": "be a python"})
recipient.py (there will be many instances of them running each with a unique recipient id)
recipient_id = "user1" # you get the idea...
db = Redis(...)
while True:
message = db.the_stream.blocking_read("$") # "$" somehow means: just receive new messages
if message.recipient == recipient_id:
perform_task(message.task)
message.acknowledge() # let the stream know it was processed
else:
pass # well, do nothing here since it's not our message. Another recipient instance should do the job.```
Upvotes: 3
Views: 962
Reputation: 5931
With the example and pseudo code you've given, let's imagine that:
recipient.user1
is getting 60 messages a minuteperform_task()
method takes 2 seconds to execute. What will happen here is obvious: the latency between a new message coming in and having it be processed will only grow over time, drifting further and further from "real-time processing".
system throughput = 30 messages/minute
To get around this, you might want create a consumer group for user1
. Here you could have 4 distinct python processes running in parallel with all 4 joined in the same group for user1
. Now when a message comes in for user1
one of the 4 workers will pick it up and perform_task()
.
system throughput = 120 message/minute
In your example, the message.acknowledge()
doesn't actually exist, because your stream reader is alone (XREAD commands).
If it were a group, the acknowledgement of messages becomes essential, that's how redis knows that one of the groups members did in fact handle that message, so it may "move on" (it may forget the fact that that message was pending acknowledgement). When you're using groups, there's a little bit of server side logic in place to ensure that every message is to delivered to one of the consumer groups workers once (XGROUPREAD commands). When the client has finished, it issues an acknowledgement of that message (XACK commands) so that the server side "consumer group buffer" may delete it and move on.
Imagine if a worker died and never acknowledged the message. With a consumer group, you're able to watch out for this situation (using XPENDING commands) and act upon them by for example retrying to process the same message in another consumer.
When you're not using groups, the redis server doesn't need to "move on", the "acknowledgement" becomes 100% client side/business logic.
Upvotes: 5