amar singh
amar singh

Reputation: 47

How to create customised release strategy for aggregator in Spring Integration using dsl?

I have a scenario where i don't want to use default release strategy for aggregator . I want to create customised aggregator release strategy .

UPDATE : Updated Integration flow :

@Bean
    public IntegrationFlow fileIntegrationFlow(){
        return IntegrationFlows.from(readFilefromDirectory(), fileInboundPollingConsumer())
                .split(fileSplitter())
                .filter(p -> !(p instanceof FileSplitter.FileMarker), 
                        f -> f.discardChannel("passToAggregate"))
                        .transform(dataDataTransformer())
                        .filter(fileFilter())
                        .wireTap(logChannel())
                        .channel("passToAggregate")
                        .aggregate(a->a.releaseStrategy(group ->
                        group.getMessages()
                        .stream()
                        .anyMatch(m ->
                        FileSplitter.FileMarker.Mark.END.name()
                        .equals(m.getHeaders().get(FileHeaders.MARKER)))))
                        .wireTap(logChannel())
                        .<List<Customer>>handle((p, h) -> new MonitoringData(p))
                        .transform(Transformers.marshaller(getMarshaller(),resultTransformer(),true))
                        .handle(fileWriter()).get();

    }

Now in logs i can see customer objects when i removed correlationstrategy :

[FileMarker [filePath=D:\example_tex\in\dateeea.txt, mark=START], com.springintegration.example.Customer@470aa1cb, com.springintegration.example.Customer@7c898288, FileMarker [filePath=D:\example_tex\in\dateeea.txt, mark=END, lineCount=6]]

Now i will start to work on output processor to filter data.

Upvotes: 1

Views: 2398

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121292

First of all you should be familiar with all the Aggregator options and strategies. For this purpose you should take a look to the Reference Manual.

After that you can just follow with the auto-completion on the AggregatorSpec configuring your .aggregate(). The custom releaseStrategy can be achieved via these methods:

/**
 * Configure the handler with an {@link ExpressionEvaluatingReleaseStrategy} for the
 * given expression.
 * @param releaseExpression the correlation expression.
 * @return the handler spec.
 * @see AbstractCorrelatingMessageHandler#setReleaseStrategy(ReleaseStrategy)
 */
public S releaseExpression(String releaseExpression) {
...
/**
 * Configure the handler with an
 * {@link org.springframework.integration.aggregator.MethodInvokingReleaseStrategy}
 * for the target object and method name.
 * @param target the target object.
 * @param methodName the method name.
 * @return the handler spec.
 * @see AbstractCorrelatingMessageHandler#setReleaseStrategy(ReleaseStrategy)
 */
public S releaseStrategy(Object target, String methodName) {
...
/**
 * @param releaseStrategy the release strategy.
 * @return the handler spec.
 * @see AbstractCorrelatingMessageHandler#setReleaseStrategy(ReleaseStrategy)
 */
public S releaseStrategy(ReleaseStrategy releaseStrategy) {
...

For your use-case with the FileSplitter and allow to release group when the Mark.END has arrived, I'd suggest this implementation:

.releaseStrategy(group ->
      group.getMessages()
             .stream()
             .anyMatch(m ->
                   FileSplitter.FileMarker.Mark.END.name()
                               .equals(m.getHeaders().get(FileHeaders.MARKER))))

Upvotes: 2

Related Questions