Reputation: 83
I was trying to create a simple program to print a Kstream from a kafka topic. I am constantly getting an NPE and am completely out of ideas.
I have used spring cloud-stream-binder-kafka-streams dependency and I am using the latest version of spring cloud "Finchley.M9".
The code I have written is:
@Component
@EnableBinding(KafkaStreamsProcessor.class)
public class EventListener{
@StreamListener("input")
public void listen(KStream<String,String> kstream){
kstream.print();
}
}
The application.properties has:
spring.cloud.stream.bindings.input.destination=slot-events
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.binder.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.binder.valueSerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.applicationId=listener
spring.cloud.stream.kafka.streams.binder.configuration.zookeeper.connect=localhost:2181
When I start the service, I keep getting the following error on the console:
018-03-31 22:57:52.641 INFO 26301 --- [ main] sStreamListenerSetupMethodOrchestrator$1 : values:
application.id = default
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 1000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
timestamp.extractor = null
value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect = localhost:2181
2018-03-31 22:57:52.656 INFO 26301 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2018-03-31 22:57:52.685 INFO 26301 --- [ main] ConditionEvaluationReportLoggingListener :
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2018-03-31 22:57:52.693 ERROR 26301 --- [ main] o.s.boot.SpringApplication : Application run failed
java.lang.IllegalStateException: java.lang.NullPointerException
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:273) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:154) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_161]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) ~[spring-cloud-stream-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:777) ~[spring-beans-5.0.4.RELEASE.jar!/:5.0.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:868) ~[spring-context-5.0.4.RELEASE.jar!/:5.0.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.0.4.RELEASE.jar!/:5.0.4.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:388) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1246) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1234) [spring-boot-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
at dg.athena.sideprojects.kafkastreampoc.KafkastreampocApplication.main(KafkastreampocApplication.java:15) [classes!/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [kafkastreampoc-0.0.1-SNAPSHOT.jar:na]
Caused by: java.lang.NullPointerException: null
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.getkStream(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:294) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:235) ~[spring-cloud-stream-binder-kafka-streams-2.0.0.RC3.jar!/:2.0.0.RC3]
... 24 common frames omitted
Can somebody please suggest?
Upvotes: 1
Views: 1000
Reputation: 504
When using Spring Starter or Spring Initializr, including Spring Cloud Stream includes a Gradle (or Maven) dependency
testImplementation 'org.springframework.cloud: spring-cloud-stream-test-support'
Everytime "bootrun" is executed to start the application, an NPE or illegal state exception is generated.
When commenting out the above Gradle dependency, the NPE goes away and everything works fine. Have repeated this on over six different spring boot applications with Spring Cloud Stream and Kafka Streams.
Currently assume everything is ok since code works well with test dependency eliminated. However, never like to proceed without understanding issues (as they can return to haunt later on).
Any thoughts/fixes are appreciated.
Upvotes: 0
Reputation: 5904
this is a bug that got fixed in the latest snapshot. Can you try upgrading the binder to 1.0.0.BUILD-SNAPSHOT and try again?
Upvotes: 2