Reputation: 1824
In an integration flow a split with its default strategy issues an item from the list. Processing of that item may fail. I want to process that error and direct a new message with the mapping information from the previous one (in addition to a custom error header) to the normal messaging channel.
In the aggregator I want to customize the aggregation logic to generate other types of messages with the count of failed processes and the result of messages that have not failed.
Here I explain how I send the error message with the header:
@Bean
public IntegrationFlow socialMediaErrorFlow() {
return IntegrationFlows.from("socialMediaErrorChannel")
.wireTap(sf -> sf.handle("errorService", "handleException"))
.<MessagingException>handle((p, h)
-> MessageBuilder.withPayload(Collections.<CommentEntity>emptyList())
.copyHeaders(p.getFailedMessage().getHeaders())
.setHeader("ERROR", true)
.build()
)
.channel("directChannel_1")
.get();
}
I want the aggregator to generate an object of this type:
public class Result {
private Integer totalTask;
private Integer taskFailed;
private List<CommentEntity> comments;
}
How should I approach this?
thanks in advance.
Thanks to the help of Artem I made this implementation:
.aggregate(a -> a.outputProcessor(new MessageGroupProcessor() {
@Override
public Object processMessageGroup(MessageGroup mg) {
Integer failedTaskCount = 0;
Integer totalTaskCount = mg.getMessages().size();
List<CommentEntity> comments = new ArrayList<>();
for(Message<?> message: mg.getMessages()){
if(message.getHeaders().containsKey("ERROR"))
failedTaskCount++;
else
comments.addAll((List<CommentEntity>)message.getPayload());
}
return new IterationResult(totalTaskCount, failedTaskCount, comments);
}
}))
Upvotes: 1
Views: 2657
Reputation: 121177
The AggregatorSpec
has outputProcessor
property:
/**
* A processor to determine the output message from the released group. Defaults to a message
* with a payload that is a collection of payloads from the input messages.
* @param outputProcessor the processor.
* @return the aggregator spec.
*/
public AggregatorSpec outputProcessor(MessageGroupProcessor outputProcessor) {
Here you can provide your own custom logic to parse all the messages in the group and build your Result
for them.
The sample from the test-case:
.aggregate(a -> a.outputProcessor(g -> g.getMessages()
.stream()
.map(m -> (String) m.getPayload())
.collect(Collectors.joining(" "))))
The Cafe Demo sample:
.aggregate(aggregator -> aggregator
.outputProcessor(g ->
new Delivery(g.getMessages()
.stream()
.map(message -> (Drink) message.getPayload())
.collect(Collectors.toList())))
.correlationStrategy(m -> ((Drink) m.getPayload()).getOrderNumber()))
Upvotes: 3