Jeahyun Kim
Jeahyun Kim

Reputation: 313

Can I test kafka-streams suppress logic?

My application use kafka streams suppress logic.

I want to test kafka streams topology using suppress.

Runnning uinit test, My topology not emit result.

Kafka streams logic

...
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(5), Suppressed.BufferConfig.maxBytes(1_000_000_000L).emitEarlyWhenFull()))
...

My test case code. After create input data, running test case cant't read suppress logic output record. just return null

testDriver.pipeInput(recordFactory.create("input", key, dummy, 0L));

System.out.println(testDriver.readOutput("streams-result", Serdes.String().deserializer(), serde.deserializer()));

Can i test my suppress logic?

Upvotes: 2

Views: 1247

Answers (1)

perkss
perkss

Reputation: 1067

The simple answer is yes.

Some good references are Confluent Example Tests this example in particular tests the suppression feature. And many other examples always a good place to check first. Here is another example of mine written in Kotlin.

An explanation of the feature and testing it can be found in post 3 on this blog post

Some key points:

  • The window will only emit the final result as expected from the documents.
  • To flush the final results you will need to send an extra dummy event as seen in the examples such as confluents here.
  • You will need to manipulate the event time to test it as suppression works off the event time this can be provided by the test input topic API or use a custom TimestampExtractor.
  • For testing I recommend setting the following to remove cache and reduce commit interval.

    props[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0 props[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = 5

Hope this helps.

Upvotes: 3

Related Questions