Reputation: 4778
I have the following processor bean method signature:
@Bean
public BiFunction<KStream<String, MyClass>, KStream<String, String>, KStream<String, MyClass>[]> myStream() {
return (inputStream1, intputStream2) -> {
intputStream2
.peek((k, v) -> {
log.debug(...);
});
return inputStream1
.mapValues(...)
.branch((k,v) -> true, (k,v) -> true);
};
}
The relevant properties:
spring.cloud.stream.function.definition: ...;myStream
spring.cloud.stream.bindings:
myStream-in-0:
destination: inp0
myStream-in-1:
destination: inp1
myStream-out-0:
destination: out0
myStream-out-1:
destination: out1
Spring Cloud Kafka Stream version Hoxton.SR4 (spring-cloud-stream-binder-kafka-streams:jar:3.0.4.RELEASE), embedded Kafka version 2.5.0.
I am testing my topology using embedded Kafka:
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
"inp0", "inp1", "out0", "out1"
},
brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {
@Test
public void embeddedKafkaTest() throws IOException, InterruptedException {
Consumer<String, MyClass> out0Consumer = createConsumer("out0ConsumerGroup");
Consumer<String, MyClass> out1Consumer = createConsumer("out1ConsumerGroup");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out0Consumer, "out0");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out1Consumer, "out1");
latch = new CountDownLatch(1);
// ... publish ...
latch.await(15, TimeUnit.SECONDS);
ConsumerRecords<String, MyClass> out0 = KafkaTestUtils.getRecords(out0Consumer);
assertThat(out0.count(), is(greaterThanOrEqualTo(1)));
ConsumerRecords<String, MyClass> out1 = KafkaTestUtils.getRecords(out1Consumer);
assertThat(out1.count(), is(greaterThanOrEqualTo(1)));
}
private <K,V> Consumer<K, V> createConsumer(String groupName) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(groupName, "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<K, V>(consumerProps).createConsumer();
}
My tests show that the messages from myStream
reach and land in the topic "out0" as expected, but "out1" topic remains empty and the unit test fails on the second assertion.
I've tried a couple of things, but it looks like the output to the second output topic is simply not being produced (the output to the first output topic is produced well).
Can you see any mistakes in my setup?
And one more thing: the return statement in the myStream bean method definition shows a compiler warning:
Unchecked generics array creation for varargs parameter
But it looks like that's how the Spring Cloud Kafka Stream 3.x API requires the return type to be defined?
Upvotes: 0
Views: 813
Reputation: 5904
You are passing two predicates to the branch
method and both of them always evaluate to true
. The first predicate always wins and produces data to the first output binding. The branch method invocation exits after the first predicate evaluate to true. See the javadoc for more details. You should use different predicates (possibly checking certain conditions on key/value). If the first predicate fails and the second one succeeds, then you will see data produced to the second output topic.
With respect to that compiler warning, I think you can safely ignore that as the API will ensure that the predicate objects passed into the branch
invocation will have proper type. Since the implementation of the method uses generic varargs, you get that exception. See this thread for details on that compiler warning.
Upvotes: 1