Reputation: 21
Ignore this question, there was just an issue in my code.
I'd like to produce different 2 kind of kafka messages in 2 different topics using Spring Cloud Stream. Everything work fine when I do only one but when I try to create 2 supplier bean functions I have an issue. here is my application.yaml
server.port: 8087
spring:
application:
name: metrics-producer
cloud:
schema:
avro:
dynamicSchemaGenerationEnabled: true
stream:
default:
contentType: application/*+avro
producer:
useNativeEncoding: true
function:
definition: rowCountSupplier;loadedColumnsSupplier
bindings:
rowCountSupplier-out-0.destination: rowcount
loadedColumnsSupplier-out-0.destination: loadedcolumns
kafka:
bootstrap-servers: localhost:9092
default:
...
Then I have the following error when starting the app
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'metricsProducer': Injection of autowired dependencies failed; nested exception is java.lang.IllegalArgumentException: Could not resolve placeholder 'spring.cloud.stream.bindings.rowCountSupplier;loadedColumnsSupplier-out-0.destination' in value "${spring.cloud.stream.bindings.${spring.cloud.stream.function.definition}-out-0.destination}"
It looks like it does not support the ; separated definition of the 2 suppliers but the doc seems to indicate this is possible.
Upvotes: 2
Views: 783
Reputation: 5924
This works fine for me.
@SpringBootApplication
public class So72361424Application {
public static void main(String[] args) {
SpringApplication.run(So72361424Application.class, args);
}
@Bean
public Supplier<String> rowCountSupplier() {
return () -> "From rowCountSupplier: " + new Random().nextInt();
}
@Bean
public Supplier<String> loadedColumnsSupplier() {
return () -> "From loadedColumnsSupplier: " + new Random().nextInt();
}
@Bean
public Consumer<String> rowCountConsumer() {
return System.out::println;
}
@Bean
public Consumer<String> loadedColumnsConsumer() {
return System.out::println;
}
}
Configiuration:
spring.cloud.function.definition=rowCountSupplier;loadedColumnsSupplier;rowCountConsumer;loadedColumnsConsumer
spring.cloud.stream.bindings.rowCountConsumer-in-0.destination=rowCountSupplier-out-0
spring.cloud.stream.bindings.loadedColumnsConsumer-in-0.destination=loadedColumnsSupplier-out-0
I see output as below:
From loadedColumnsSupplier: -1442588112
From rowCountSupplier: -653426801
From loadedColumnsSupplier: -1397059810
From rowCountSupplier: 1267011618
From rowCountSupplier: 861125412
From loadedColumnsSupplier: 550252583
From rowCountSupplier: 1477875916
From loadedColumnsSupplier: 259212207
...
...
Something must be wrong in the configuration on your side. I noticed that you are using AVRO and native serialization. Are you providing a schema registry? Debug and see if you see any errors. If you still face issues, please provide a reproducible minimal sample.
Upvotes: 1