Reputation: 1
I am working on writing unit tests for a Kafka Streams app that uses a time window to aggregate messages and return the maximum value seen in those messages during that window. Below is the definition for the stream.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, CustomObject> stream = builder.stream(inputTopicName, Consumed.with(Serdes.String(), outputValueSerde).withTimestampExtractor(new CustomTimestampExtractor()));
stream.groupBy((key, value) -> String.valueOf(value.getMessageKey()),
Grouped.with(Serdes.String(), outputValueSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(AGGREGATION_PERIOD)))
.emitStrategy(EmitStrategy.onWindowClose())
.aggregate(
CustomObject::new,
(key, value, aggregate) ->
{
// aggregation logic
},
Materialized.<String, CustomObject, WindowStore<Bytes, byte[]>>
as("state-store")
.withKeySerde(Serdes.String())
.withValueSerde(outputValueSerde)
.withStoreType(Materialized.StoreType.IN_MEMORY)
.withCachingDisabled()
.withRetention(Duration.ofSeconds(AGGREGATION_PERIOD * RETENTION_BUFFER_RATE))
)
.toStream(windowedKey, aggregate) -> windowedKey.key())
.to(outputTopicName, Produced.with(Serdes.String(), outputValueSerde));
When running, this stream behaves as desired. However, the unit test does not. Below is the test in question.
@Test
public void testStreamsValidInput()
{
// two messages within window
CustomObject value = new CustomObject("<timestamp>", <integerKey>, <doubleValueBeingAggregated>);
CustomObject aggregate = new CustomObject("<timestamp>", <integerKey>, <doubleValueBeingAggregated>);
// send message timed for after window of first two messages, advances stream time and closes previous window
CustomObject advance = new CustomObject("<timestampAfterWindowOfFirstTwoMessages>", <integerKey>, <doubleValueBeingAggregated>);
// send first two
inputTopic.pipeInput(aggregate);
inputTopic.pipeInput(value);
// send third to close window
inputTopic.pipeInput(advance);
CustomObject output = null;
try
{
output = outputTopic.readValue();
} catch (NoSuchElementException e)
{
fail("No message was produced to output topic!", e);
}
assertEquals(<timestampOfWindowTruncatedToSpecifiedTime>, output.getTimestamp().toString());
assertEquals(1, output.getIntegerKey());
assertEquals(2.0, output.getDoubleValueBeingAggregated());
// confirm only one message was produced to output topic
assertThrows(NoSuchElementException.class, () -> outputTopic.readValue());
}
When running this test, it fails on outputTopic.readValue()
because the stream did not produce a value to the output topic.
I ran the real stream and pushed the exact same messages into the input topic as I am in the test. The real stream behaved as expected: after sending the third message, the first window closed and the correct aggregate message was produced.
I then switched .emitStrategy(EmitStrategy.onWindowClose())
to .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
and the test passed.
The CustomTimestampExtractor
uses the message's embedded timestamp for stream time.
If anyone has any insight into why the test passes while using suppress()
but not emitStrategy()
and how to fix the test to make it work with emitStrategy()
, it would be much appreciated.
Thanks in advance.
Upvotes: 0
Views: 36