How to customize message aggregation logic in Spring Integration Java DSL

In an integration flow a split with its default strategy issues an item from the list. Processing of that item may fail. I want to process that error and direct a new message with the mapping information from the previous one (in addition to a custom error header) to the normal messaging channel.

In the aggregator I want to customize the aggregation logic to generate other types of messages with the count of failed processes and the result of messages that have not failed.

Here I explain how I send the error message with the header:

@Bean
public IntegrationFlow socialMediaErrorFlow() {
     return IntegrationFlows.from("socialMediaErrorChannel")
          .wireTap(sf -> sf.handle("errorService", "handleException"))
          .<MessagingException>handle((p, h)
               -> MessageBuilder.withPayload(Collections.<CommentEntity>emptyList())
                  .copyHeaders(p.getFailedMessage().getHeaders())
                  .setHeader("ERROR", true)
                  .build()
           )
           .channel("directChannel_1")
           .get();
}

I want the aggregator to generate an object of this type:

public class Result {

     private Integer totalTask;
     private Integer taskFailed;
     private List<CommentEntity> comments;

}

How should I approach this?

thanks in advance.

Thanks to the help of Artem I made this implementation:

.aggregate(a -> a.outputProcessor(new MessageGroupProcessor() {
        @Override
        public Object processMessageGroup(MessageGroup mg) {
           Integer failedTaskCount = 0;
           Integer totalTaskCount =  mg.getMessages().size();
           List<CommentEntity> comments = new ArrayList<>();
           for(Message<?> message: mg.getMessages()){
                if(message.getHeaders().containsKey("ERROR"))
                  failedTaskCount++;
                else
                            comments.addAll((List<CommentEntity>)message.getPayload());
        }

     return new IterationResult(totalTaskCount, failedTaskCount, comments);

    }
}))

Upvotes: 1

Views: 2657

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

The AggregatorSpec has outputProcessor property:

/**
 * A processor to determine the output message from the released group. Defaults to a message
 * with a payload that is a collection of payloads from the input messages.
 * @param outputProcessor the processor.
 * @return the aggregator spec.
 */
public AggregatorSpec outputProcessor(MessageGroupProcessor outputProcessor) {

Here you can provide your own custom logic to parse all the messages in the group and build your Result for them.

The sample from the test-case:

.aggregate(a -> a.outputProcessor(g -> g.getMessages()
                        .stream()
                        .map(m -> (String) m.getPayload())
                        .collect(Collectors.joining(" "))))

The Cafe Demo sample:

.aggregate(aggregator -> aggregator
        .outputProcessor(g ->
                    new Delivery(g.getMessages()
                                .stream()
                                .map(message -> (Drink) message.getPayload())
                                .collect(Collectors.toList())))
       .correlationStrategy(m -> ((Drink) m.getPayload()).getOrderNumber()))

Upvotes: 3

Related Questions