user11994
user11994

Reputation: 431

timeouts for Kafka streams

I am expecting n messages on a Kakfa topic and once I reach n messages I emit a message on a new topic. I'm using the streams API to do this and it is straightforward. However, due to unreliabilities in the system I may never receive n but I still want to emit the message if x% (e.g. 95%) of the n messages have been received and a new message hasn't been recorded for y seconds. Is this possible with Kafka streams or do I need to write a consumer for it?

If Kafka Streams has the concept of a timeout similar to Rx (http://reactivex.io/documentation/operators/timeout.html) I think this would be possible but I haven't been able to find one in the streaming API yet.

Upvotes: 1

Views: 910

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

There is not timeout concept, but you can use punctuations to do what you want. You will need to use Kafka 1.0.0 that adds wall-clock-time punctuations and also allows to cancel a punctuation schedule.

Thus, each time you receive a record and hit the x% mark, you can register a schedule with the timeout you want. If you receive the next message before the timeout, you can cancel the schedule and register a new one. Also, if the punctuations triggers you can emit and cancel the current schedule.

There is not much documentation about this atm, as Confluent Open-Source 4.0 is not released yet (it uses Kafka 1.0.0 internally). But you can look into the design proposal for some details: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics

Note: you don't need to upgrade your brokers and can just upgrade your Streams library to 1.0.0 if you want. Cf. https://docs.confluent.io/current/streams/upgrade-guide.html#compatibility (1.0.0 has the same backward compatibility to older brokers as 0.11.0.x)

Upvotes: 2

Related Questions