earroyoron
earroyoron

Reputation: 192

Unable to run just this simple stream,... serde configuration needed?

Yes, I've read all docs I've found and try all alternatives on configuration, but just this simple example that should log a line does not work

(this is an Spring-Boot-2 app with spring-cloud-stream-binder-kafka-streams)

Kafka is storing an String value (null key)

my apllication.yaml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: 'myStreamTopic'
    output:
          producer.keySerde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
      kafka:
        streams:
          binder:
            configuration:
              default.key.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
              default.value.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
            brokers:
              - 'ommited:9092'
              - 'ommited:9092'
              - 'ommited:9092'
            application-id: hack1

Just this simple code as a POC:

@SpringBootApplication
@Slf4j
public class HackatonApplication {

    public static void main(String[] args) {
SpringApplication.run(HackatonApplication.class, args);
}

  @EnableBinding(KafkaStreamsProcessor.class)
  public static class LineProcessor {

    @StreamListener(Sink.INPUT)
    public void process(KStream<?, String> line) {
      log.info("Received: {}", line);
    }

  }

and I am unable to get it running!

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalArgumentException: Trying to invoke public abstract org.apache.kafka.streams.kstream.KStream org.apache.kafka.streams.kstream.KStream.map(org.apache.kafka.streams.kstream.KeyValueMapper) but no delegate has been set. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:184) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE] at

sorry if this is trivial but I have been hours searching, googling and trying to find a documented solution.

Upvotes: 2

Views: 2575

Answers (1)

sobychacko
sobychacko

Reputation: 5924

You are using the out of the box KafkaStreamsProcessor for binding which expects a single KStream as input and another KStream as output. If you are using this standard one, you must provide proper configuration for output binding (such as destination etc.). Then your method must return a KStream and you need to use the SendTo annotation on the spring side to bind. Something like below:

@StreamListener(Sink.INPUT)
@SendTo("output")
public KStream<?,String> process(KStream<?, String> line) {
    log.info("Received: {}", line);
    return line;
}

However, in your case, you can use a custom processor and use that for EnableBinding.

interface CustomKafkaStreamsProcessor {
        @Input("input")
        KStream<?, ?> input();
    } 

and then use this with your binding. @EnableBinding(CustomKafkaStreamsProcessor.class).

That way, you don't have to change the method to return something.

Upvotes: 5

Related Questions