Reputation: 545
While exploring how to unit test a Kafka Stream I came across ProcessorTopologyTestDriver
, unfortunately this class seems to have gotten broken with version 0.10.1.0
(KAFKA-4408)
Is there a work around available for the KTable issue?
I saw the "Mocked Streams" project but first it uses version 0.10.2.0
, while I'm on 0.10.1.1
and second it is Scala, while my tests are Java/Groovy.
Any help here on how to unit test a stream without having to bootstrap zookeeper/kafka would be great.
Note: I do have integration tests that use embedded servers, this is for unit tests, aka fast, simple tests.
EDIT
Thank you to Ramon Garcia
For people arriving here in Google searches, please note that the test driver class is now org.apache.kafka.streams.TopologyTestDriver
This class is in the maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils
Upvotes: 4
Views: 3747
Reputation: 56
For people arriving here in Google searches, please note that the test driver class is now org.apache.kafka.streams.TopologyTestDriver
This class is in the maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils
Upvotes: 2
Reputation: 545
I found a way around this, I'm not sure it is THE answer especially after https://stackoverflow.com/users/4953079/matthias-j-sax comment. In any case, sharing what I have so far...
I completely copied ProcessorTopologyTestDriver
from the 0.10.1 branch (that's the version I'm using).
To address KAFKA-4408 I made private final MockConsumer<byte[], byte[]> restoreStateConsumer
accessible and moved the chunk task = new StreamTask(...
to a separate method, e.g. bootstrap
.
On the setup phase of my test I do the following
driver = new ProcessorTopologyTestDriver(config, builder)
ArrayList partitionInfos = new ArrayList();
partitionInfos.add(new PartitionInfo('my_ktable', 1, (Node) null, (Node[]) null, (Node[]) null));
driver.restoreStateConsumer.updatePartitions('my_ktable', partitionInfos);
driver.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition('my_ktable', 1), Long.valueOf(0L)));
driver.bootstrap()
And that's it...
Bonus
I also ran into KAFKA-4461, fortunately since I copied the whole class I was able to "cherry-pick" the accepted fix with minor tweaks.
As always feedback is appreciated. Although apparently not an official test class, this driver is proven super useful!
Upvotes: 2