Reputation: 87
I learnt from This blog and this tutorial that in order to test suppression with event time semantics, one should send dummy records to advance stream time. I've tried to advance time by doing just that. But this does not seem to work unless time is advanced for a particular key.
I have a custom TimestampExtractor
which associates my preferred "stream-time" with the records.
My stream topology pseudocode is as follows (I use the Kafka Streams DSL API):
source.mapValues(someProcessingLambda)
.flatMap(flattenRecordsLambda)
.groupByKey(Grouped.with(Serdes.ByteArray(), Serdes.ByteArray()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(Duration.ZERO))
.aggregate(()->null, aggregationLambda)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
My input is of the following format:
1 - {"stream_time":"2019-04-09T11:08:36.000-04:00", id:"1", data:"..."}
2 - {"stream_time":"2019-04-09T11:09:36.000-04:00", id:"1", data:"..."}
3 - {"stream_time":"2019-04-09T11:18:36.000-04:00", id:"2", data:"..."}
4 - {"stream_time":"2019-04-09T11:19:36.000-04:00", id:"2", data:"..."}
.
.
Now records 1
and 2
belong to a 10 minute window according to stream_time
and 3
and 4
belong to another.
Within that window, records are aggregated as per id
.
I expected that record 3
would signal that the stream has advanced and cause suppress to emit the data corresponding to 1st window.
However, the data is not emitted until I send a dummy record with id:1
to advance the stream time for that key.
Have I understood the testing instruction incorrectly? Is this expected behavior? Does the key of the dummy record matter?
Upvotes: 3
Views: 906
Reputation: 46
I’m sorry for the trouble. This is indeed a tricky problem. I have some ideas for adding some operations to support this kind of integration testing, but it’s hard to do without breaking basic stream processing time semantics.
It sounds like you’re testing a “real” KafkaStreams application, as opposed to testing with TopologyTestDriver. My first suggestion is that you’ll have a much better time validating your application semantics with TopologyTestDriver, if it meets your needs.
It sounds to me like you might have more than one partition in your input topic (and therefore your application). In the event that key 1 goes to one partition, and key 3 goes to another, you would see what you’ve observed. Each partition of your application tracks stream time independently. TopologyTestDriver works nicely because it only uses one partition, and also because it processes data synchronously. Otherwise, you’ll have to craft your “dummy” time advancement messages to go to the same partition as the key you’re trying to flush out.
This is going to be especially tricky because your “flatMap().groupByKey()” is going to repartition the data. You’ll have to craft the dummy message so that it goes into the right partition after the repartition. Or you could experiment with writing your dummy messages directly into the repartition topic.
If you do need to test with KafkaStreams instead of TopologyTestDriver, I guess the easiest thing is just to write a “time advancement” message per key, as you were suggesting in your question. Not because it’s strictly necessary, but because it’s the easiest way to meet all these caveats. I’ll also mention that we are working on some general improvements to stream time handling in Kafka Streams that should simplify the situation significantly, but that doesn’t help you right now, of course.
Upvotes: 3