Reputation: 313
My application use kafka streams suppress
logic.
I want to test kafka streams topology using suppress.
Runnning uinit test, My topology not emit result.
Kafka streams logic
...
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(5), Suppressed.BufferConfig.maxBytes(1_000_000_000L).emitEarlyWhenFull()))
...
My test case code.
After create input data, running test case cant't read suppress logic output record.
just return null
testDriver.pipeInput(recordFactory.create("input", key, dummy, 0L));
System.out.println(testDriver.readOutput("streams-result", Serdes.String().deserializer(), serde.deserializer()));
Can i test my suppress logic?
Upvotes: 2
Views: 1247
Reputation: 1067
The simple answer is yes.
Some good references are Confluent Example Tests this example in particular tests the suppression feature. And many other examples always a good place to check first. Here is another example of mine written in Kotlin.
An explanation of the feature and testing it can be found in post 3 on this blog post
Some key points:
For testing I recommend setting the following to remove cache and reduce commit interval.
props[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0 props[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = 5
Hope this helps.
Upvotes: 3