SeB
SeB

Reputation: 21

Spring Cloud Stream multiple function supplier issue

Ignore this question, there was just an issue in my code.

I'd like to produce different 2 kind of kafka messages in 2 different topics using Spring Cloud Stream. Everything work fine when I do only one but when I try to create 2 supplier bean functions I have an issue. here is my application.yaml

    server.port: 8087
spring:
  application:
    name: metrics-producer
  cloud:
    schema:
      avro:
        dynamicSchemaGenerationEnabled: true
    stream:
      default:
        contentType: application/*+avro
        producer:
          useNativeEncoding: true
      function:
        definition: rowCountSupplier;loadedColumnsSupplier
      bindings:
        rowCountSupplier-out-0.destination: rowcount
        loadedColumnsSupplier-out-0.destination: loadedcolumns
      kafka:
        bootstrap-servers: localhost:9092
        default:
...

Then I have the following error when starting the app

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'metricsProducer': Injection of autowired dependencies failed; nested exception is java.lang.IllegalArgumentException: Could not resolve placeholder 'spring.cloud.stream.bindings.rowCountSupplier;loadedColumnsSupplier-out-0.destination' in value "${spring.cloud.stream.bindings.${spring.cloud.stream.function.definition}-out-0.destination}"

It looks like it does not support the ; separated definition of the 2 suppliers but the doc seems to indicate this is possible.

Upvotes: 2

Views: 783

Answers (1)

sobychacko
sobychacko

Reputation: 5924

This works fine for me.

@SpringBootApplication
public class So72361424Application {

    public static void main(String[] args) {
        SpringApplication.run(So72361424Application.class, args);
    }

    @Bean
    public Supplier<String> rowCountSupplier() {
        return () -> "From rowCountSupplier: " + new Random().nextInt();
    }

    @Bean
    public Supplier<String> loadedColumnsSupplier() {
        return () -> "From loadedColumnsSupplier: " + new Random().nextInt();
    }

    @Bean
    public Consumer<String> rowCountConsumer() {
        return System.out::println;
    }

    @Bean
    public Consumer<String> loadedColumnsConsumer() {
        return System.out::println;
    }
}

Configiuration:

spring.cloud.function.definition=rowCountSupplier;loadedColumnsSupplier;rowCountConsumer;loadedColumnsConsumer
spring.cloud.stream.bindings.rowCountConsumer-in-0.destination=rowCountSupplier-out-0
spring.cloud.stream.bindings.loadedColumnsConsumer-in-0.destination=loadedColumnsSupplier-out-0

I see output as below:

From loadedColumnsSupplier: -1442588112
From rowCountSupplier: -653426801
From loadedColumnsSupplier: -1397059810
From rowCountSupplier: 1267011618
From rowCountSupplier: 861125412
From loadedColumnsSupplier: 550252583
From rowCountSupplier: 1477875916
From loadedColumnsSupplier: 259212207
...
...

Something must be wrong in the configuration on your side. I noticed that you are using AVRO and native serialization. Are you providing a schema registry? Debug and see if you see any errors. If you still face issues, please provide a reproducible minimal sample.

Upvotes: 1

Related Questions