Boon
Boon

Reputation: 1173

Kafka Streams and writing to the state store

I am working on a Kafka Streams application built with Spring Cloud Stream. In this application I need to:

  1. Consume a continuous stream of messages that can be retrieved at a later time.
  2. Persist a list of the message IDs matching some criteria.
  3. In a separate thread, run a scheduler which reads out the message IDs at a regular interval, retrieve the corresponding messages that match those IDs, and perform an action with those messages.
  4. Remove the processed message IDs from the list so that work is not duplicated.

I have considered implementing this as follows:

  1. Consume the incoming stream of messages as a materialized KTable so that I can look up and retrieve messages by key at a later time.
  2. Materialize the list of message IDs in another state store.
  3. Use Spring's scheduling mechanism to run a separate thread which reads from the state store via the InteractiveQueryService bean.

The problem I hit is that the InteractiveQueryService provides read-only access to the state store, so I cannot remove entries in the other thread. I have decided not to use Kafka Stream's punctuate capability since the semantics are different; my scheduling thread must always run at a regular interval, irrespective of the processing of the inbound messages.

Another alternative might be to use the low-level Processor API, and pass a reference to the writable state store to my scheduler thread. I will need to synchronize on write operations. But I'm not sure if this is do-able or if there are other constraints when accessing the state store like this from a separate thread.

Any input or advice would be appreciated!

Upvotes: 0

Views: 1477

Answers (1)

Peyman
Peyman

Reputation: 385

my scheduling thread must always run at a regular interval, irrespective of the processing of the inbound messages

Well, punctuation based on WALL_CLOCK_TIME does exactly what you discribed above.

The problem I hit is that the InteractiveQueryService provides read-only access to the state store

Using the Processor API and Punctuation allows you to access the state stores within the init() with ProcessorContext#getStateStore() and remove entries from the stores in ProcessorContext#schedule(). The advantage of this solution is, that the processor and punctuator run in the same thread and you don't need any synchronisation between them.

Upvotes: 1

Related Questions