Hartimer
Hartimer

Reputation: 545

How to unit test Kafka Streams

While exploring how to unit test a Kafka Stream I came across ProcessorTopologyTestDriver, unfortunately this class seems to have gotten broken with version 0.10.1.0 (KAFKA-4408)

Is there a work around available for the KTable issue?

I saw the "Mocked Streams" project but first it uses version 0.10.2.0, while I'm on 0.10.1.1 and second it is Scala, while my tests are Java/Groovy.

Any help here on how to unit test a stream without having to bootstrap zookeeper/kafka would be great.

Note: I do have integration tests that use embedded servers, this is for unit tests, aka fast, simple tests.

EDIT

Thank you to Ramon Garcia

For people arriving here in Google searches, please note that the test driver class is now org.apache.kafka.streams.TopologyTestDriver

This class is in the maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils

Upvotes: 4

Views: 3747

Answers (2)

For people arriving here in Google searches, please note that the test driver class is now org.apache.kafka.streams.TopologyTestDriver

This class is in the maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils

Upvotes: 2

Hartimer
Hartimer

Reputation: 545

I found a way around this, I'm not sure it is THE answer especially after https://stackoverflow.com/users/4953079/matthias-j-sax comment. In any case, sharing what I have so far...

I completely copied ProcessorTopologyTestDriver from the 0.10.1 branch (that's the version I'm using).

To address KAFKA-4408 I made private final MockConsumer<byte[], byte[]> restoreStateConsumer accessible and moved the chunk task = new StreamTask(... to a separate method, e.g. bootstrap.

On the setup phase of my test I do the following

driver = new ProcessorTopologyTestDriver(config, builder)
ArrayList partitionInfos = new ArrayList();
partitionInfos.add(new PartitionInfo('my_ktable', 1, (Node) null, (Node[]) null, (Node[]) null));
driver.restoreStateConsumer.updatePartitions('my_ktable', partitionInfos);
driver.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition('my_ktable', 1), Long.valueOf(0L)));
driver.bootstrap()

And that's it...

Bonus

I also ran into KAFKA-4461, fortunately since I copied the whole class I was able to "cherry-pick" the accepted fix with minor tweaks.

As always feedback is appreciated. Although apparently not an official test class, this driver is proven super useful!

Upvotes: 2

Related Questions