Swapnil
Swapnil

Reputation: 864

What is the replacement for DirectProcessor

I have a Kafka publisher which publishes the data using DirectProcessor class of Project Reactor. But as per the official document, DirectProcessor is getting deprecated and will be removed in version 3.5.

Current implementation

@Bean
public DirectProcessor<RequestDTO> publisher(){
   return DirectProcessor.create();
}

@Bean
public FluxSink<RequestDTO> sink(DirectProcessor<RequestDTO> publisher){
   return publisher.sink();
}

And here is the Supplier which supplies the data to the Kafka binder using DirectProcessor bean configured above.

@Autowired
private DirectProcessor<RequestDTO> source;

@Bean
public Supplier<Flux<RequestDTO>> supplier(){
     return () -> Flux.from(source);
};

I tried to use the recommended way using Sinks to replace DirectProcessor.

@Bean
public Sinks.Many<RequestDTO> newSink() {
   return Sinks.many().multicast().onBackpressureBuffer();
}
@Autowired
private Sinks.Many<RequestDTO> newSink;

@Bean
public Supplier<Flux<Sinks.Many<RequestDTO>>> supplier(){
    return () -> Flux.from(newSink);
};

But the application start failed with this exception

Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'supplierInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: 'org.springframework.integration.dsl.IntegrationFlowBuilder org.springframework.integration.dsl.IntegrationFlows.from(java.util.function.Supplier)'

What is the correct way to achieve this?

Upvotes: 0

Views: 952

Answers (1)

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6126

Yeah, we have StreamBridge component for that - https://docs.spring.io/spring-cloud-stream/docs/3.1.5/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources. Meanwhile, feel free to raise an issue (https://github.com/spring-cloud/spring-cloud-stream/issues) and we'll take a look since what you're describing should work as well.

Upvotes: 1

Related Questions