Reputation: 365
I have a flow that
1. Starts with a config map -> MainGateway.start(configMap) -> void
2. Splits map into multiple messages per entry
3. For every config entry do the following using an orchestrator java class:
BEGIN LOOP (offset and limit)
Data d = HTTPGateway.getData();
PublishGateway.sendMessage(d); -> Send to 2 SQS queues
END LOOP
Requirement I have to schedule this flow via cron. One option is to provide an HTTP endpoint that will start the flow. But then the second HTTP request should wait/timeout/error until the first is complete.
Question I was looking at barrier to implement blocking for the flow thread until it completes and have only a single thread http processor, so at one time only 1 request is processed and I can know when the flow is complete. (The LOOP ends for all config entry objects and all messages to SQS are acked). How can I achieve this? I have a loop and am using pub-sub channel with executors for parallel configs and parallel SQS dispatch.
I have trimmed down the XML config
below for clarity.
<!-- Bring in list of Configs to process -->
<int:gateway service-interface="Gateway"
default-request-channel="configListChannel" />
<int:chain input-channel="configListChannel" output-channel="configChannel">
<!-- Split the list to one instance of config per message -->
<int:splitter/>
<int:filter expression="payload.enablePolling" />
</int:chain>
<!-- Manually orchestrate a loop to query a system as per config and publish messages to SQS -->
<bean class="Orchestrator" id="orchestrator" />
<int:service-activator ref="orchestrator" method="getData" input-channel="configChannel" />
<!-- The flow from this point onwards is triggered inside a loop controlled by the Orchestrator
The following Gateway calls are inside Orchestrators loop -->
<!-- Create a Http request from the Orchestrator using a Gateway -->
<int:gateway service-interface="HttpGateway">
<int:method name="getData"
request-channel="requestChannel"
payload-expression="#args[0]">
</int:method>
</int:gateway>
<!-- Transform request object to json and invoke Http endpoint -->
<int:chain input-channel="requestChannel" id="httpRequestChain">
<int:object-to-json-transformer />
<int-http:outbound-gateway url-expression="headers['config'].url"
http-method="POST"
expected-response-type="java.lang.String"
/>
</int:chain>
<!-- Publish Messages to Outbound Gateway -->
<task:executor id="executor" pool-size="5" />
<int:publish-subscribe-channel id="publishChannel" task-executor="executor" />
<int:gateway service-interface="PublishGateway" >
<int:method name="publishToOutbound" payload-expression="#args[0]" request-channel="publishChannel" />
</int:gateway>
<!-- Route to System A SQS with transformations (omitted here)-->
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS" channel="publishChannel" queue="system-a-queue" success-channel="successChannel" failure-channel="errorChannel"/>
<!-- Route to System B SQS with transformations (omitted here)-->
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS" channel="publishChannel" queue="system-b-queue" success-channel="successChannel" failure-channel="errorChannel"/>
<int:logging-channel-adapter logger-name="sqsCallbackLogger" log-full-message="true" channel="successChannel" />
In the meanwhile, I am trying to adapt the A B C barrier example from spring-integration-samples
to my use case.
Upvotes: 1
Views: 397
Reputation: 121427
As you pointed out in your comment, an aggregator
approach could be used in your solution.
This way you aggregated results of those parallel SQS requests and wait for an aggregation reply in the original requestor. This way it is really going to be blocked even if internals of your flow still concurrent. You call a gateway and the reply for it is going to be from the aggregator.
Upvotes: 1