Reputation: 793
I have the need to implement an integration flow made of multiple steps, where each step can be performed by a variable number of processors (plugins).
What I have so far:
<!-- gateway -->
<int:gateway default-request-channel="step/1" service-interface="ServiceGateway">
<int:method name="send" />
</int:gateway>
<!-- plugin 1 -->
<int:publish-subscribe-channel id="step/1" apply-sequence="true" />
<int:service-activator input-channel="step/1" output-channel="step/2">
<bean class="Transformer" />
</int:service-activator>
<int:service-activator input-channel="step/1" output-channel="step/2">
<bean class="Transformer2" />
</int:service-activator>
<!-- plugin 2 -->
<int:publish-subscribe-channel id="step/2" apply-sequence="true" />
<int:service-activator input-channel="step/2" output-channel="end">
<bean class="Transformer3" />
</int:service-activator>
<int:service-activator input-channel="step/2" output-channel="end">
<bean class="HttpTransformer4" />
</int:service-activator>
<!-- aggregation -->
<int:channel id="end" />
<int:aggregator input-channel="end" />
The expected behaviour is the following:
Everything is working fine but the result is not the expected, I receive only 2 (random) items instead of 4.
I suppose the problem is that the aggregator triggers the release after only two items because the "apply-sequence" in the "step/2" channel overwrites the "apply-sequence" in "step/1". So the question is: how can I make the aggregator wait for all the messages?
Thank you in advance.
Custom Release Strategy:
@SuppressWarnings("unchecked")
@Override
public boolean canRelease ( MessageGroup group ) {
MessageHeaders headers = group.getOne ().getHeaders ();
List<List<Object>> sequenceDetails = (List<List<Object>>) headers.get ( "sequenceDetails" );
System.out.println ( sequenceDetails );
int expectedSize = 1;
//map message id, max group size reached (i.e. sequenceNumber==sequenceSize)
for ( List<Object> sequenceItem : sequenceDetails ) {
if ( sequenceItem.get ( 1 ) != sequenceItem.get ( 2 ) ) {
System.err.println ( "--> AGG: no release check, group max not reached" );
return false;
}
expectedSize *= (int) sequenceItem.get ( 2 );//multiplies the group sizes
}
int expectedSize2 = expectedSize * (int) headers.get ( "sequenceSize" );
int currentSize = group.getMessages ().size () * expectedSize;
System.err.println ( "--> AGG: " + expectedSize2 + " : " + currentSize );
boolean canRelease = expectedSize2 == currentSize;
if ( canRelease ) {
System.out.println ( "ok" );
}
return canRelease;
}
Prints out:
[[7099b583-55d4-87d3-4502-993f05bfb388, 1, 2]]
--> AGG: no release check, group max not reached
[[7099b583-55d4-87d3-4502-993f05bfb388, 1, 2]]
--> AGG: no release check, group max not reached
[[7099b583-55d4-87d3-4502-993f05bfb388, 2, 2]]
--> AGG: 4 : 2
[[7099b583-55d4-87d3-4502-993f05bfb388, 2, 2]]
--> AGG: 4 : 4
Aggregation code:
@Aggregator
public Object aggregate ( List<Message<?>> objects ) {
List<Object> res = new ArrayList<> ();
for ( Message<?> m : objects ) {
res.add ( m.getPayload () );
MessageHeaders headers2 = m.getHeaders ();
System.out.println ( headers2.get ( "history" ) );
}
return res;
}
Prints out:
gateway2,core-channel,(inner bean)#57018165,async/step/1,core-channel,(inner bean)#57018165,async/step/2,core-channel,(inner bean)#57018165,end2
gateway2,core-channel,(inner bean)#57018165,async/step/1,core-channel,(inner bean)#57018165,async/step/2,core-channel,(inner bean)#57018165,end2
[102, 202] --> final result list, expected to be made of 4 items
Upvotes: 1
Views: 962
Reputation: 174779
Use a custom release strategy. The correlation data from the first pubsub is pushed onto a stack in the sequenceDetails
headers by the second pubsub.
EDIT
The problem is there are two groups; you need to correlate on the initial correlationId. Here's a pure SpEL solution; it might be safer to use custom correlation/release strategies to ensure the data is as expected (and use getOne()
instead of the iterator)...
<int:aggregator input-channel="end2"
correlation-strategy-expression=
"headers['sequenceDetails'][0][0]"
release-strategy-expression=
"size() == iterator().next().headers['sequenceSize'] * iterator().next().headers['sequenceDetails'][0][2]" />
Upvotes: 1