Mike Summers
Mike Summers

Reputation: 2239

How to use existing Spring Cloud Data Flow Starter/Sample Applications with Source bean?

Working on a new CDF Source application I noticed that @EnableBinding is deprecated so I implemented a MessageSource with a Supplier bean

    @Bean
    public Supplier<Flux<Message<LogRecord>>> getLogs() {

For testing I created a stream between my Source and the File Sink. Both services came-up just fine, but they refused to communicate. I could see the Source write to the stream, and the Sink listening but they acted like they were not on the same channel.

Finally I modified the code to use @EnableBinding and @PollableSource and everything worked as it should.

I can't find anything in the documentation about this, what have I missed?

Thanks in advance.

--- Update ---

Source:

@Component
@Slf4j
public class KinesisSource {
    @Autowired
    KinesisMessageSource kinesisMessageSource;
    public KinesisSource(){
    }

    @Bean
    public Supplier<Flux<Message<LogRecord>>> getLogs() {
        return () -> IntegrationReactiveUtils.messageSourceToFlux(kinesisMessageSource);
    }
}

Source config:

spring.cloud.function.definition=getLogs
spring.cloud.stream.function.bindings.getLogs-out-0=output

Sink is the provided File Sink Starter.

Upvotes: 0

Views: 389

Answers (1)

sobychacko
sobychacko

Reputation: 5924

You can find various example of writing suppliers using the functional model here: https://github.com/spring-cloud/stream-applications/tree/main/functions/supplier

When using the functional model, going by your example, your default binding becomes getLogs-out-0. Is that the binding you used for your producer?

If you want a reactive style supplier to poll on a schedule, see this specific example: https://github.com/spring-cloud/stream-applications/blob/main/functions/supplier/jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java#L66

Upvotes: 1

Related Questions