Reputation: 838
I have spring integration flow that gets triggered once a every day, that pulls all parties from database and sends each party to an executorChannel. The next flow would pull data for each party and then process them parallelly by sending in to a different executor channel. Challenge i'm facing is how do i know when this entire process ends. Any ideas on how to acheve this . Here's my pseudo code of executor channels and integration flows.
@Bean
public IntegrationFlow fileListener() {
return IntegrationFlows.from(Files.inboundAdapter(new
File("pathtofile"))).channel("mychannel").get();
}
@Bean
public IntegrationFlow flowOne() throws ParserConfigurationException {
return IntegrationFlows.from("mychannel").handle("serviceHandlerOne",
"handle").nullChannel();
}
@Bean
public IntegrationFlow parallelFlowOne() throws ParserConfigurationException {
return IntegrationFlows.from("executorChannelOne").handle("parallelServiceHandlerOne",
"handle").nullChannel();
}
@Bean
public IntegrationFlow parallelFlowTwo() throws ParserConfigurationException {
return IntegrationFlows.from("executorChannelTwo").handle("parallelServiceHandlerTwo",
"handle").nullChannel();
}
@Bean
public MessageChannel executorChannelOne() {
return new ExecutorChannel(
Executors.newFixedThreadPool(10));
}
@Bean
public MessageChannel executorChannelTwo;() {
return new ExecutorChannel(
Executors.newFixedThreadPool(10));
}
@Component
@Scope("prototype")
public class ServiceHandlerOne{
@Autowired
MessageChannel executorChannelOne;
@ServiceActivator
public Message<?> handle(Message<?> message) {
List<?> rowDatas = repository.findAll("parties");
rowDatas.stream().forEach(data -> {
Message<?> message = MessageBuilder.withPayload(data).build();
executorChannelOne.send(message);
});
return message;
}
}
@Component
@Scope("prototype")
public class ParallelServiceHandlerOne{
@Autowired
MessageChannel executorChannelTwo;;
@ServiceActivator
public Message<?> handle(Message<?> message) {
List<?> rowDatas = repository.findAll("party");
rowDatas.stream().forEach(data -> {
Message<?> message = MessageBuilder.withPayload(data).build();
executorChannelTwo;.send(message);
});
return message;
}
}
Upvotes: 1
Views: 145
Reputation: 121552
First of all no reason to make your services as @Scope("prototype")
: I don't see any state holding in your services, so they are stateless, therefore can simply be as singleton
. Second: since you make your flows ending with the nullChannel()
, therefore point in returning anything from your service methods. Therefore just void
and flow is going to end over there naturally.
Another observation: you use executorChannelOne.send(message)
directly in the code of your service method. The same would be simply achieved if you just return that new message from your service method and have that executorChannelOne
as the next .channel()
in your flow definition after that handle("parallelServiceHandlerOne", "handle")
.
Since it looks like you do that in the loop, you might consider to add a .split()
in between: the handler return your List<?> rowDatas
and splitter will take care for iterating over that data and replies each item to that executorChannelOne
.
Now about your original question.
There is really no easy to say that your executors are not busy any more. They might not be at the moment of request just because the message for task has not reached an executor channel yet.
Typically we recommend to use some async synchronizer for your data. The aggregator is a good way to correlate several messages in-the-flight. This way the aggregator collects a group and does not emit reply until that group is completed.
The splitter I've mentioned above adds a sequence details headers by default, so subsequent aggregator can track a message group easily.
Since you have layers in your flow, it looks like you would need a several aggregators: two for your executor channels after splitting, and one top level for the file. Those two would reply to the top-level for the final, per-file grouping.
You also may think about making those parties
and party
calls in parallel using a PublishSubscribeChannel
, which also can be configured with a applySequence=true
. This info then will be used by the top-level aggregator for info per file.
See more in docs:
https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#splitter
Upvotes: 1