Reputation: 192
Yes, I've read all docs I've found and try all alternatives on configuration, but just this simple example that should log a line does not work
(this is an Spring-Boot-2 app with spring-cloud-stream-binder-kafka-streams)
Kafka is storing an String value (null key)
my apllication.yaml
spring:
cloud:
stream:
bindings:
input:
destination: 'myStreamTopic'
output:
producer.keySerde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
kafka:
streams:
binder:
configuration:
default.key.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
default.value.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
brokers:
- 'ommited:9092'
- 'ommited:9092'
- 'ommited:9092'
application-id: hack1
Just this simple code as a POC:
@SpringBootApplication
@Slf4j
public class HackatonApplication {
public static void main(String[] args) {
SpringApplication.run(HackatonApplication.class, args);
}
@EnableBinding(KafkaStreamsProcessor.class)
public static class LineProcessor {
@StreamListener(Sink.INPUT)
public void process(KStream<?, String> line) {
log.info("Received: {}", line);
}
}
and I am unable to get it running!
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalArgumentException: Trying to invoke public abstract org.apache.kafka.streams.kstream.KStream org.apache.kafka.streams.kstream.KStream.map(org.apache.kafka.streams.kstream.KeyValueMapper) but no delegate has been set. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:184) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE] at
sorry if this is trivial but I have been hours searching, googling and trying to find a documented solution.
Upvotes: 2
Views: 2575
Reputation: 5924
You are using the out of the box KafkaStreamsProcessor
for binding which expects a single KStream
as input and another KStream
as output. If you are using this standard one, you must provide proper configuration for output binding (such as destination etc.). Then your method must return a KStream
and you need to use the SendTo
annotation on the spring side to bind. Something like below:
@StreamListener(Sink.INPUT)
@SendTo("output")
public KStream<?,String> process(KStream<?, String> line) {
log.info("Received: {}", line);
return line;
}
However, in your case, you can use a custom processor and use that for EnableBinding
.
interface CustomKafkaStreamsProcessor {
@Input("input")
KStream<?, ?> input();
}
and then use this with your binding. @EnableBinding(CustomKafkaStreamsProcessor.class)
.
That way, you don't have to change the method to return something.
Upvotes: 5