blurfus
blurfus

Reputation: 14041

Unable to Aggregate Messages, using wrong approach?

I am pretty new with Spring Integration. I am unable to figure out how to solve the following scenario.

I have two channels producing two key pieces of data for the final message which is to be sent to a REST endpoint.

  1. A polling channel produces an OAuth2 token at regular intervals and publishes (for this and other channels to use)
  2. A file monitoring channel produces the payload (loading contents of payload from file)

I need to add the token of channel 1 into the header of channel 2, similar to this diagram:

                     other channels            __ Ch3 (uses token)
                     subscribed to the -->    /
                     publishing channel    --+ -- Ch4 (uses token)
                                          /   \__ Ch5 (uses token)
Ch1 ==> getToken ==> enrich header & publ.
                                          \    Merge
                                           ==> Token &  ==> Send to endpoint
                                          /    Payload
Ch2 ==>  readFile ==>  create new Payload

I tried using an aggregator but I am not sure what to use for correlation strategy or release strategy.

My questions are:

  1. Is an aggregator the way to go? (or maybe there is another approach?)
  2. Any suggestions on what the correlation/release strategy could be?

Any directions would be greatly appreciated.

Here's what I have so far:

<!-- channels -->
<int:channel id="aggregatedMsgOutChannel"/>

<!-- Token messages are published to here -->
<int:publish-subscribe-channel id="tokenInChannel" />

<!-- Message Aggregator -->
<int:aggregator
        id="messageAndOauthTokenAggregator"
        input-channel="tokenInChannel"
        message-store="simpleMessageStore"
        ref="oauthTokenAggregator"
        method="aggregate"
        output-channel="aggregatedMsgOutChannel">
 </int:aggregator>

 <!-- Define a store for our messages -->
 <bean id="simpleMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />

<bean id="oauthTokenAggregator" class="c.s.i.OauthTokenAggregator">

OauthTokenAggregator.java

@Component
public class OauthTokenAggregator{
    private static final Logger log = LoggerFactory.getLogger(OauthTokenAggregator.class);

    @Aggregator
    public Message aggregate(Collection<Message<?>> messages) {
        log.debug("aggregating...");

        //.... Unsure as to how to fill this section

        return new GenericMessage("test");
    }
}

Upvotes: 1

Views: 173

Answers (2)

blurfus
blurfus

Reputation: 14041

I ended up using a suggested approach by @Artem: generate a token within the file process.

The credit to the main implementation came from this article in DZone. Replicating the code here for completeness (full credit to article's author).

So, basically,

  1. I started the file process in the first chain, checked it into claim-check-in and put its UUID into my header.

  2. Instead of using this in the middle chain

<int:service-activator expression="new String('different string')"/>

I went and fetched a token and added it to my headers and

  1. Continued to the last chain, got my payload from claim check and posted it to the REST endpoint

    <int:chain input-channel="claim-check-in-channel"
               output-channel="processing-channel">
        <int:claim-check-in message-store="simpleMessageStore"/>
        <int:header-enricher>
            <int:header 
    
                name="#{T(com.l8mdv.sample.ClaimCheckGateway).CLAIM_CHECK_ID}"
                expression="payload"/>
        </int:header-enricher>
    </int:chain>
    
    <int:chain input-channel="processing-channel"
               output-channel="claim-check-out-channel">
        <int:service-activator expression="new String('different string')"/>
    </int:chain>
    
    <int:chain input-channel="claim-check-out-channel">
        <int:transformer
                expression="headers.get('#{T(com.l8mdv.sample.ClaimCheckGateway)
                .CLAIM_CHECK_ID}')"/>
        <int:claim-check-out message-store="simpleMessageStore"
                             remove-message="true"/>
    </int:chain>
    

Upvotes: 0

Artem Bilan
Artem Bilan

Reputation: 121560

I think you go right way with the aggregator.

  1. You should select some common property for the generated token and file content. I'm pretty sure it must be there, otherwise how would you connect then without an aggregator or Spring Integration at all?

  2. The release strategy looks pretty simple - just size of 2 - token and payload.

  3. The aggregating function really can produce just only a single message based on the file content as payload - from one message. and place token into headers from another message. Messages you can distinguish by payload type or some header from them.

Only the problem that I don't know your business logic, so, I can't provide you good advice. Maybe you should not use a separate polling channel adapter, but generate token withing file process?..

Or... each file could just poll tokens from some queue if they are generated separately and can be used somewhere else.

That is a variant without aggregator. If we stick with that one anyway, we don't have choice unless choose some correlation between file and token.

Upvotes: 1

Related Questions