Blink
Blink

Reputation: 1546

Spring Integration flow not invoked from handle

I have the following 2 components which should delete first a document from Mongo and afterwards from Elastic.

Main flow:

@Component
public class DeleteDocumentFlow {

    @Autowired
    private StoreInMongoFlow storeInMongoFlow;

    @Bean
    public IntegrationFlow deleteDocument() {
        return IntegrationFlows.from(Channels.METADATA_DELETE_STATUS.name())
                .handle(storeInMongoFlow.deleteDocumentInMongo())
                .channel("deleteDocumentInES.input")
                .get();
    }
}

Service:

@Component
public class StoreInMongoFlow {
    @Bean
    public IntegrationFlow deleteDocumentInMongo() {
        return flow -> flow.
                <Metadata>handle((p, h) -> {
                    DBObject obj = BasicDBObjectBuilder.start("i", p.getId()).get();
                    DeleteResult documentEntry = this.mongoTemplate.remove(obj, "docs");
                    return documentEntry.getDeletedCount();
                })
                .log(LoggingHandler.Level.INFO, m -> "Number of documents deleted: " + m.getPayload());
    }
}

Unfortunately the deleteDocumentInMongo is never invoked. The bean is properly registered as I can see it in the logs.

Am I doing something fundamentally wrong or would you need some more debugging info? If I wiretap the handle, then the deleteDocumentInES.input is executed but the mongo flow is simply ignored.

Upvotes: 0

Views: 1009

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

You definitely do something fundamentally wrong. You try to treat an IntegrationFlow as a service to call from the handle(). This is no for what an IntegrationFlow has been designed. See docs for more info: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl

The DSL provides an IntegrationFlow component to define a composition of channels and endpoints between them, but now IntegrationFlow plays only the configuration role to populate real beans in the application context and is not used at runtime.

if you declare a logic as a separate IntegrationFlow, you don't need to worry about that handle() - just use channel("deleteDocumentInMongo.input") to send a message from point of main flow to a first channel of that MongoDB sub-flow.

If you want to do the same operation with Elastic, you should think about having a PublishSubscribeChannel to send a message and two flows starting from this channel.

Since you end deleteDocumentInMongo flow with log() you can't get any reply back and your .channel("deleteDocumentInES.input") is not going to be reachable.

Please, read more docs to understand what is pub-sub, request-reply, service activator, and flow per se.

Upvotes: 3

Related Questions