Reputation: 1035
I am new to Kafka Streams and Spring Cloud Stream but have read good things about it in terms of moving the integration related codes into properties file so devs can focus mostly on the business logic side of things.
Here I have my simple application class.
package com.some.events.consumer
import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer
@SpringBootApplication
class ConsumerApplication {
@Bean
fun consume(): Consumer<KStream<String, SomeEvent>> {
return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
}
}
fun main(args: Array<String>) {
runApplication<ConsumerApplication>(*args)
}
My application.yml
file is as follows.
spring:
cloud:
function:
definition: consume
stream:
bindings:
consume-in-0:
destination: "some-event"
group: "some-event"
My dependencies in build.gradle.kts
are defined as follows (just included the relevant ones here).
extra["springCloudVersion"] = "2020.0.2"
dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.cloud:spring-cloud-stream")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
When I run the application I got the following exception.
org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.5.jar:5.3.5]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
at com.some.events.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:22) ~[main/:na]
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:94) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.5.jar:5.3.5]
... 14 common frames omitted
Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.5.jar:5.3.5]
at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
... 15 common frames omitted
Process finished with exit code 1
Note that I am aware that I need to configure the Serde and Avro related things (I am using Avro for event schema), but the thing is, the stream won't even run.
Can someone point me in the right direction? I tried googling this but no one has posted an issue where it's caused by 'listener' cannot be null. Thanks!
Upvotes: 3
Views: 2955
Reputation: 255
I have the same problem, so first i add the io.micrometer dependency(install the latest version from maven)
second create Bean from SimpleMeterRegistry and it solved the problem
@Bean
SimpleMeterRegistry simpleMeterRegistry() {
return new SimpleMeterRegistry();
}
Upvotes: 0
Reputation: 7633
The destination: "some-event"
should point to a kafka topic. Like destination: "some-event-topic"
.
Then you have to create an interface for the listener consume-in-0
. Using the spring annotations will make the project load this listener and it will not be null anymore.
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
public interface KafkaListenerBinding {
@Input("consume-in-0")
KStream<String, String> inputStream();
}
Then you create a @Service to process messages from the listener @StreamListener("consume-in-0")
.
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
@Log4j2
@Service
@EnableBinding(KafkaListenerBinding.class)
public class KafkaListenerService {
@StreamListener("consume-in-0")
public void process(KStream<String, String> input) {
input.foreach((k,v) -> log.info(String.format("Key: %s, Value: %s",k,v)));
}
}
NOTE: Despite the bug said by @Gary Russel I am going to complete my answer with the functional manner to implement the Spring service.
The functional style can be achieved by defining the function at the application.yml
file. There is an internal convention to use the name of the function and the posfix in-0
and out-0
for the bindings. You have to use this when defining the binding. More details here.
spring:
cloud:
stream:
function:
definition: transformToUpperCase
bindings:
transformToUpperCase-in-0:
destination: input-func-topic
transformToUpperCase-out-0:
destination: output-func-topic
Then you annotate your class with @Configuration
and @EnableAutoConfiguration
and make sure that the lambda method is the same that you defined on the application.yml
file for the function.definition
.
@Configuration
@EnableAutoConfiguration
public class KafkaListenerFunctionalService {
@Bean
public Function<KStream<String, String>, KStream<String, String>> transformToUpperCase() {
return input -> input
.peek((k, v) -> log.info("Functional received Input: {}", v))
.mapValues(i -> i.toUpperCase());
}
}
Upvotes: 1
Reputation: 174789
This is a bug; it is fixed in the 3.1.3-SNAPSHOT
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030#issuecomment-804039087
I am not sure about the comment there; adding micrometer to the class path should resolve it.
Upvotes: 6