Reputation: 2239
Working on a new CDF Source application I noticed that @EnableBinding
is deprecated so I implemented a MessageSource
with a Supplier
bean
@Bean
public Supplier<Flux<Message<LogRecord>>> getLogs() {
For testing I created a stream between my Source and the File Sink. Both services came-up just fine, but they refused to communicate. I could see the Source write to the stream, and the Sink listening but they acted like they were not on the same channel.
Finally I modified the code to use @EnableBinding
and @PollableSource
and everything worked as it should.
I can't find anything in the documentation about this, what have I missed?
Thanks in advance.
--- Update ---
Source:
@Component
@Slf4j
public class KinesisSource {
@Autowired
KinesisMessageSource kinesisMessageSource;
public KinesisSource(){
}
@Bean
public Supplier<Flux<Message<LogRecord>>> getLogs() {
return () -> IntegrationReactiveUtils.messageSourceToFlux(kinesisMessageSource);
}
}
Source config:
spring.cloud.function.definition=getLogs
spring.cloud.stream.function.bindings.getLogs-out-0=output
Sink is the provided File Sink Starter.
Upvotes: 0
Views: 389
Reputation: 5924
You can find various example of writing suppliers using the functional model here: https://github.com/spring-cloud/stream-applications/tree/main/functions/supplier
When using the functional model, going by your example, your default binding becomes getLogs-out-0
. Is that the binding you used for your producer?
If you want a reactive style supplier to poll on a schedule, see this specific example: https://github.com/spring-cloud/stream-applications/blob/main/functions/supplier/jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java#L66
Upvotes: 1