Reputation: 864
I have a Kafka publisher which publishes the data using DirectProcessor
class of Project Reactor. But as per the official document, DirectProcessor
is getting deprecated and will be removed in version 3.5.
Current implementation
@Bean
public DirectProcessor<RequestDTO> publisher(){
return DirectProcessor.create();
}
@Bean
public FluxSink<RequestDTO> sink(DirectProcessor<RequestDTO> publisher){
return publisher.sink();
}
And here is the Supplier which supplies the data to the Kafka binder using DirectProcessor
bean configured above.
@Autowired
private DirectProcessor<RequestDTO> source;
@Bean
public Supplier<Flux<RequestDTO>> supplier(){
return () -> Flux.from(source);
};
I tried to use the recommended way using Sinks to replace DirectProcessor.
@Bean
public Sinks.Many<RequestDTO> newSink() {
return Sinks.many().multicast().onBackpressureBuffer();
}
@Autowired
private Sinks.Many<RequestDTO> newSink;
@Bean
public Supplier<Flux<Sinks.Many<RequestDTO>>> supplier(){
return () -> Flux.from(newSink);
};
But the application start failed with this exception
Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'supplierInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: 'org.springframework.integration.dsl.IntegrationFlowBuilder org.springframework.integration.dsl.IntegrationFlows.from(java.util.function.Supplier)'
What is the correct way to achieve this?
Upvotes: 0
Views: 952
Reputation: 6126
Yeah, we have StreamBridge
component for that - https://docs.spring.io/spring-cloud-stream/docs/3.1.5/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources.
Meanwhile, feel free to raise an issue (https://github.com/spring-cloud/spring-cloud-stream/issues) and we'll take a look since what you're describing should work as well.
Upvotes: 1