Reputation: 1546
I have the following 2 components which should delete first a document from Mongo and afterwards from Elastic.
Main flow:
@Component
public class DeleteDocumentFlow {
@Autowired
private StoreInMongoFlow storeInMongoFlow;
@Bean
public IntegrationFlow deleteDocument() {
return IntegrationFlows.from(Channels.METADATA_DELETE_STATUS.name())
.handle(storeInMongoFlow.deleteDocumentInMongo())
.channel("deleteDocumentInES.input")
.get();
}
}
Service:
@Component
public class StoreInMongoFlow {
@Bean
public IntegrationFlow deleteDocumentInMongo() {
return flow -> flow.
<Metadata>handle((p, h) -> {
DBObject obj = BasicDBObjectBuilder.start("i", p.getId()).get();
DeleteResult documentEntry = this.mongoTemplate.remove(obj, "docs");
return documentEntry.getDeletedCount();
})
.log(LoggingHandler.Level.INFO, m -> "Number of documents deleted: " + m.getPayload());
}
}
Unfortunately the deleteDocumentInMongo is never invoked. The bean is properly registered as I can see it in the logs.
Am I doing something fundamentally wrong or would you need some more debugging info? If I wiretap the handle, then the deleteDocumentInES.input is executed but the mongo flow is simply ignored.
Upvotes: 0
Views: 1009
Reputation: 121272
You definitely do something fundamentally wrong. You try to treat an IntegrationFlow
as a service to call from the handle()
. This is no for what an IntegrationFlow
has been designed. See docs for more info: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl
The DSL provides an
IntegrationFlow
component to define a composition of channels and endpoints between them, but nowIntegrationFlow
plays only the configuration role to populate real beans in the application context and is not used at runtime.
if you declare a logic as a separate IntegrationFlow
, you don't need to worry about that handle()
- just use channel("deleteDocumentInMongo.input")
to send a message from point of main flow to a first channel of that MongoDB sub-flow.
If you want to do the same operation with Elastic, you should think about having a PublishSubscribeChannel
to send a message and two flows starting from this channel.
Since you end deleteDocumentInMongo
flow with log()
you can't get any reply back and your .channel("deleteDocumentInES.input")
is not going to be reachable.
Please, read more docs to understand what is pub-sub, request-reply, service activator, and flow per se.
Upvotes: 3