shani
shani

Reputation: 43

How to define TTL for redis streams?

I have two micro services and I need to implement reliable notifications between them. I thought about using redis streams - serviceA will send a request to serviceB with an identifier X. Once serviceB is done doing the work serviceA asked for, it'll create/add to a stream (the stream is specific for X) a new item to let it know it's done.

ServiceA can send multiple requests, each request may have a different identifier. So it'll block for new elements in different streams.

My question is how can I delete streams which are no longer needed, depending on their age. For example I'd like to have streams that were created over a day ago deleted. Is this possible?

If it's not, I'd love to hear any ideas you have as to how not to have unneeded streams in redis.

Thanks

Upvotes: 4

Views: 8277

Answers (2)

Boris Chervenkov
Boris Chervenkov

Reputation: 650

I think the question is about deleting unused streams - not the messages in the streams, as described in the previous answer.

You can specify a TTL for a stream, just as you can for any other data element.

Example

Create the streams & add some messages:

XADD stream1 * field1 value-101 field2 value-201
XADD stream1 * field1 value-102 field2 value-202

XADD stream2 * field1 value-301 field2 value-401
XADD stream2 * field1 value-302 field2 value-402

Set TTL for the stream after 120 seconds:

EXPIRE stream1 120  
(integer) 1

TTL stream1 
(integer) 116    

Adding new messages to the stream does not affect the expiration:

XADD stream2 * field1 value-501 field2 value-501

TTL stream1 
(integer) 98

An option to keep the stream alive would be to issue an expire command every time you send a message through it.

XADD stream2 * field1 value-301 field2 value-401
EXPIRE stream2 120

XADD stream2 * field1 value-302 field2 value-402
EXPIRE stream2 120

This will ensure that the stream will be alive as long as it is used, and gets removed when not in use.

Upvotes: 2

sonus21
sonus21

Reputation: 5398

There's no straight forward way to delete older entries based on the TTL/age. You can use a combination of XTRIM/XDEL with other commands to trim the stream.

Let's see how we can use XTRIM

XTRIM stream MAXLEN ~ SIZE

XTRIM trims the stream to a given number of items, evicting older items (items with lower IDs) if needed.

You generate the stream size every day or periodically based on your delete policy and store it somewhere using XLEN command

Run a periodic job that would call XTRIM as

XTRIM x-stream MAXLEN ~ (NEW_SIZE - PREVIOUS_SIZE)

For example, yesterday stream size was 500 now it's 600 then we need to delete 500 entries so we can just run

XTRIM x-stream MAXLEN ~ 100

You can use different policies for deletion for example daily, weekly, twice a week, etc.


XDEL stream ID [ID...]

Removes the specified entries from a stream, and returns the number of entries deleted, that may be different from the number of IDs passed to the command in case certain IDs do not exist.

So what you can do is whenever Service B consumes the event than the service itself can delete the stream entry as service B knows the stream ID, but this will not work as soon as you start using the consumer group. So I would say use Redis set or Redis map to track the acknowledge stream ids and run a periodic sweep job to clean up the stream.

For example

Service A sends a stream item with ID1 to service B Service B acknowledges the stream item after consuming the items in the map ack_stream = { ID1: true }, you can track other data e.g. count in case of the consumer group.

A sweep job would run at periodically like 1 AM daily that reads all the elements of ack_stream and filters out all items that require deletion. Now you can call XDEL commands in batch with the set of stream ids.

Upvotes: 1

Related Questions