srini
srini

Reputation: 564

Spring Boot Kafka Streams - Binding Issue

I am trying build out a simple streams app based on Kafka Streams using this example.

Word Count

However when I am starting the app, I get the below error: Can someone please point out on what I am missing out here? Here is the code, config & error

@SpringBootApplication
@Slf4j
@EnableScheduling
@EnableBinding(PersonBinding.class)
public class DemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }

  @Component
  public static class PersonSource {

    private final MessageChannel personOut;

    @Autowired
    PersonSource(PersonBinding personBinding) {

      this.personOut = personBinding.personOut();
    }

    @Scheduled(fixedDelay = 5000L)
    public void run() {

      Message<Person> message = MessageBuilder
          .withPayload(new Person("John", "Doe", Instant.now()))
          .build();

      try {

        personOut.send(message);

        log.info("Published message: {}", message);
      } catch (Exception e) {

        e.printStackTrace();
        throw e;
      }
    }
  }

  @Component
  public static class PersonProcessor {

    @StreamListener
    public void process(@Input(PersonBinding.PERSON_IN) KStream<String, Person> events) {

      events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
    }
  }
}

@Data
@AllArgsConstructor
class Person {

  String firstName;

  String lastName;

  Instant createdOn;
}

interface PersonBinding {

  String PERSON_IN = "pin";

  String PERSON_OUT = "pout";

  @Output(PERSON_OUT)
  MessageChannel personOut();

  @Input(PERSON_IN)
  KStream<String, Person> personIn();
}

Dependency Management (Spring Boot 1.5.13.RELEASE)

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.1.0</version>
</dependency>

Configuration

# Default Configuration
spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
# Out Bindings Configuration
spring.cloud.stream.bindings.pout.destination=pout
spring.cloud.stream.bindings.pout.producer.header-mode=raw
# In Bindings Configuration
spring.cloud.stream.bindings.pin.destination=pout
spring.cloud.stream.bindings.pin.consumer.header-mode=raw

Error

Field configurationProperties in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration required a single bean, but 2 were found:
  - spring.cloud.stream.kafka.binder-org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties: a programmatically registered singleton - binderConfigurationProperties: defined by method 'binderConfigurationProperties' in class path resource [org/springframework/cloud/stream/binder/kstream/config/KStreamBinderSupportAutoConfiguration.class]


Action:

Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed

** EDIT 1 **

Uploaded code to Github https://github.com/tapitoe/demo-spring-cloud-streams/tree/master/src

Upvotes: 3

Views: 7575

Answers (3)

Jay Ehsaniara
Jay Ehsaniara

Reputation: 1529

I had a similar problem, It got fixed after I added:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

Upvotes: 3

Gary Russell
Gary Russell

Reputation: 174484

You need to add the kstream binder to the pom; the starter only adds the message channel binder.

EDIT

I just copied similar code into an app with no problems.

@SpringBootApplication
@EnableBinding(So50693858Application.PersonBinding.class)
public class So50693858Application {

    public static void main(String[] args) {
        SpringApplication.run(So50693858Application.class, args);
    }

    @StreamListener
    public void process(@Input(PersonBinding.PERSON_IN) KStream<String, String> events) {

        events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
    }

    interface PersonBinding {

        String PERSON_IN = "pin";

        String PERSON_OUT = "pout";

        @Output(PERSON_OUT)
        MessageChannel personOut();

        @Input(PERSON_IN)
        KStream<String, String> personIn();
    }

}

and sent a message from the console producer to pout and

Key: null; Value: foo

It's not clear, however, why you have input and output bindings to the same destination (not that that would cause the problem you see).

EDIT

This works too (with your properties):

@SpringBootApplication
@EnableBinding(So50693858Application.PersonBinding.class)
public class So50693858Application {

    public static void main(String[] args) {
        SpringApplication.run(So50693858Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel pout) {
        return args -> {
            pout.send(new GenericMessage<>("foo".getBytes(),
                    Collections.singletonMap(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())));
            pout.send(new GenericMessage<>("baz".getBytes(),
                    Collections.singletonMap(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())));
        };
    }

    @StreamListener
    public void process(@Input(PersonBinding.PERSON_IN) KStream<String, String> events) {

        events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
    }

    interface PersonBinding {

        String PERSON_IN = "pin";

        String PERSON_OUT = "pout";

        @Output(PERSON_OUT)
        MessageChannel personOut();

        @Input(PERSON_IN)
        KStream<String, String> personIn();
    }

}

and

Key: bar; Value: foo
Key: qux; Value: baz

EDIT3

Pom for 2.0.x version:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>so50693858</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>so50693858</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.RC2</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>


</project>

config:

# Default Configuration
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
# Out Bindings Configuration
spring.cloud.stream.bindings.pout.destination=pout
spring.cloud.stream.bindings.pout.producer.header-mode=raw
# In Bindings Configuration
spring.cloud.stream.bindings.pin.destination=pout
spring.cloud.stream.bindings.pin.consumer.header-mode=raw

Upvotes: 2

Nikem
Nikem

Reputation: 5776

Try moving EnableBinding annotation to DemoApplication class. I believe it should be put on @Configuration class, not on arbitrary @Component.

Upvotes: 0

Related Questions