max
max

Reputation: 793

spring integration: multiple step multiple channel subscribers

I have the need to implement an integration flow made of multiple steps, where each step can be performed by a variable number of processors (plugins).

What I have so far:

<!-- gateway -->
<int:gateway default-request-channel="step/1" service-interface="ServiceGateway">
    <int:method name="send" />
</int:gateway>


<!-- plugin 1 -->
<int:publish-subscribe-channel id="step/1" apply-sequence="true" />

<int:service-activator input-channel="step/1" output-channel="step/2">
    <bean class="Transformer" />
</int:service-activator>

<int:service-activator input-channel="step/1" output-channel="step/2">
    <bean class="Transformer2" />
</int:service-activator>


<!-- plugin 2 -->
<int:publish-subscribe-channel id="step/2" apply-sequence="true" />

<int:service-activator input-channel="step/2" output-channel="end">
    <bean class="Transformer3" />
</int:service-activator>

<int:service-activator input-channel="step/2" output-channel="end">
    <bean class="HttpTransformer4" />
</int:service-activator>

<!-- aggregation -->
<int:channel id="end" />
<int:aggregator input-channel="end" />

The expected behaviour is the following:

  1. send the first request through the gateway
  2. the input is processed by 2 "step/1" plugins
  3. each output of the "step/1" plugins is processed by the "step/2" plugins
  4. the aggregator should aggregate 4 items (1 -> 2 -> 4)

Everything is working fine but the result is not the expected, I receive only 2 (random) items instead of 4.

I suppose the problem is that the aggregator triggers the release after only two items because the "apply-sequence" in the "step/2" channel overwrites the "apply-sequence" in "step/1". So the question is: how can I make the aggregator wait for all the messages?

Thank you in advance.

Custom Release Strategy:

@SuppressWarnings("unchecked")
@Override
public boolean canRelease ( MessageGroup group ) {

    MessageHeaders headers = group.getOne ().getHeaders ();
    List<List<Object>> sequenceDetails = (List<List<Object>>) headers.get ( "sequenceDetails" );
    System.out.println ( sequenceDetails );

    int expectedSize = 1;
    //map message id, max group size reached (i.e. sequenceNumber==sequenceSize)
    for ( List<Object> sequenceItem : sequenceDetails ) {
        if ( sequenceItem.get ( 1 ) != sequenceItem.get ( 2 ) ) {
            System.err.println ( "--> AGG: no release check, group max not reached" );
            return false;
        }
        expectedSize *= (int) sequenceItem.get ( 2 );//multiplies the group sizes
    }

    int expectedSize2 = expectedSize * (int) headers.get ( "sequenceSize" );

    int currentSize = group.getMessages ().size () * expectedSize;
    System.err.println ( "--> AGG: " + expectedSize2 + " : " + currentSize );
    boolean canRelease = expectedSize2 == currentSize;
    if ( canRelease ) {
        System.out.println ( "ok" );
    }
    return canRelease;
}

Prints out:

[[7099b583-55d4-87d3-4502-993f05bfb388, 1, 2]]

--> AGG: no release check, group max not reached

[[7099b583-55d4-87d3-4502-993f05bfb388, 1, 2]]

--> AGG: no release check, group max not reached

[[7099b583-55d4-87d3-4502-993f05bfb388, 2, 2]]

--> AGG: 4 : 2

[[7099b583-55d4-87d3-4502-993f05bfb388, 2, 2]]

--> AGG: 4 : 4

Aggregation code:

@Aggregator
public Object aggregate ( List<Message<?>> objects ) {

    List<Object> res = new ArrayList<> ();
    for ( Message<?> m : objects ) {
        res.add ( m.getPayload () );
        MessageHeaders headers2 = m.getHeaders ();
        System.out.println ( headers2.get ( "history" ) );
    }

    return res;
}

Prints out:

gateway2,core-channel,(inner bean)#57018165,async/step/1,core-channel,(inner bean)#57018165,async/step/2,core-channel,(inner bean)#57018165,end2

gateway2,core-channel,(inner bean)#57018165,async/step/1,core-channel,(inner bean)#57018165,async/step/2,core-channel,(inner bean)#57018165,end2

[102, 202] --> final result list, expected to be made of 4 items

Upvotes: 1

Views: 962

Answers (1)

Gary Russell
Gary Russell

Reputation: 174779

Use a custom release strategy. The correlation data from the first pubsub is pushed onto a stack in the sequenceDetails headers by the second pubsub.

EDIT

The problem is there are two groups; you need to correlate on the initial correlationId. Here's a pure SpEL solution; it might be safer to use custom correlation/release strategies to ensure the data is as expected (and use getOne() instead of the iterator)...

<int:aggregator input-channel="end2"
        correlation-strategy-expression=
           "headers['sequenceDetails'][0][0]"
        release-strategy-expression=
           "size() == iterator().next().headers['sequenceSize'] * iterator().next().headers['sequenceDetails'][0][2]" />

Upvotes: 1

Related Questions