Reputation: 564
I am trying build out a simple streams app based on Kafka Streams using this example.
However when I am starting the app, I get the below error: Can someone please point out on what I am missing out here? Here is the code, config & error
@SpringBootApplication
@Slf4j
@EnableScheduling
@EnableBinding(PersonBinding.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Component
public static class PersonSource {
private final MessageChannel personOut;
@Autowired
PersonSource(PersonBinding personBinding) {
this.personOut = personBinding.personOut();
}
@Scheduled(fixedDelay = 5000L)
public void run() {
Message<Person> message = MessageBuilder
.withPayload(new Person("John", "Doe", Instant.now()))
.build();
try {
personOut.send(message);
log.info("Published message: {}", message);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
}
@Component
public static class PersonProcessor {
@StreamListener
public void process(@Input(PersonBinding.PERSON_IN) KStream<String, Person> events) {
events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
}
}
}
@Data
@AllArgsConstructor
class Person {
String firstName;
String lastName;
Instant createdOn;
}
interface PersonBinding {
String PERSON_IN = "pin";
String PERSON_OUT = "pout";
@Output(PERSON_OUT)
MessageChannel personOut();
@Input(PERSON_IN)
KStream<String, Person> personIn();
}
Dependency Management (Spring Boot 1.5.13.RELEASE)
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
Configuration
# Default Configuration
spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
# Out Bindings Configuration
spring.cloud.stream.bindings.pout.destination=pout
spring.cloud.stream.bindings.pout.producer.header-mode=raw
# In Bindings Configuration
spring.cloud.stream.bindings.pin.destination=pout
spring.cloud.stream.bindings.pin.consumer.header-mode=raw
Error
Field configurationProperties in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration required a single bean, but 2 were found:
- spring.cloud.stream.kafka.binder-org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties: a programmatically registered singleton - binderConfigurationProperties: defined by method 'binderConfigurationProperties' in class path resource [org/springframework/cloud/stream/binder/kstream/config/KStreamBinderSupportAutoConfiguration.class]
Action:
Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed
** EDIT 1 **
Uploaded code to Github https://github.com/tapitoe/demo-spring-cloud-streams/tree/master/src
Upvotes: 3
Views: 7575
Reputation: 1529
I had a similar problem, It got fixed after I added:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Upvotes: 3
Reputation: 174484
You need to add the kstream binder to the pom; the starter only adds the message channel binder.
EDIT
I just copied similar code into an app with no problems.
@SpringBootApplication
@EnableBinding(So50693858Application.PersonBinding.class)
public class So50693858Application {
public static void main(String[] args) {
SpringApplication.run(So50693858Application.class, args);
}
@StreamListener
public void process(@Input(PersonBinding.PERSON_IN) KStream<String, String> events) {
events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
}
interface PersonBinding {
String PERSON_IN = "pin";
String PERSON_OUT = "pout";
@Output(PERSON_OUT)
MessageChannel personOut();
@Input(PERSON_IN)
KStream<String, String> personIn();
}
}
and sent a message from the console producer to pout
and
Key: null; Value: foo
It's not clear, however, why you have input and output bindings to the same destination (not that that would cause the problem you see).
EDIT
This works too (with your properties):
@SpringBootApplication
@EnableBinding(So50693858Application.PersonBinding.class)
public class So50693858Application {
public static void main(String[] args) {
SpringApplication.run(So50693858Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel pout) {
return args -> {
pout.send(new GenericMessage<>("foo".getBytes(),
Collections.singletonMap(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())));
pout.send(new GenericMessage<>("baz".getBytes(),
Collections.singletonMap(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())));
};
}
@StreamListener
public void process(@Input(PersonBinding.PERSON_IN) KStream<String, String> events) {
events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
}
interface PersonBinding {
String PERSON_IN = "pin";
String PERSON_OUT = "pout";
@Output(PERSON_OUT)
MessageChannel personOut();
@Input(PERSON_IN)
KStream<String, String> personIn();
}
}
and
Key: bar; Value: foo
Key: qux; Value: baz
EDIT3
Pom for 2.0.x version:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>so50693858</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>so50693858</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.RC2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
config:
# Default Configuration
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
# Out Bindings Configuration
spring.cloud.stream.bindings.pout.destination=pout
spring.cloud.stream.bindings.pout.producer.header-mode=raw
# In Bindings Configuration
spring.cloud.stream.bindings.pin.destination=pout
spring.cloud.stream.bindings.pin.consumer.header-mode=raw
Upvotes: 2
Reputation: 5776
Try moving EnableBinding
annotation to DemoApplication
class. I believe it should be put on @Configuration
class, not on arbitrary @Component
.
Upvotes: 0