Reputation: 3152
With Java 8 streams, I can group messages by a classifier:
Map<String, List<String>> grouped = Arrays.asList("a", "b", "b", "b", "c")
.stream()
.collect(Collectors.groupingBy(Function.identity()));
I want to write an aggregator which groups messages correspondingly. Out of five messages with payloads as shown above I want to produce three messages:
the first should have "a"
as payload, the second should have a list of three "b"
as payload, the third should have "c"
as payload.
All message groups should be released when the sequence size has been reached. Grouping based on the payload works fine, but the message groups never get released.
In the release strategy I have access to the sequence size, but I cannot find out the total number of items that have been processed. How can I release my grouped messages?
public interface StringGrouper {
List<Message<?> groupSame(List<String> toGroup);
}
@Bean
public IntegrationFlow groupStringsFlow() {
return IntegrationFlows.from(StringGrouper.class)
.split()
.aggregate(agg -> agg
.correlationStrategy(message -> message.getPayload())
.releaseStrategy(group -> group.getSequenceSize() == /* what? */))
.logAndReply();
}
@Test
public void shouldGroupMessages {
List<Message<?> grouper
.groupSame(Arrays.asList("a", "b", "b", "b", "c"));
}
A workaround is not to use an aggregator at all and group the incoming list in a transformer instead. But I would expect that I can use an aggregator for this.
@Bean
public IntegrationFlow groupStringsFlow() {
return IntegrationFlows.from(StringGrouper.class)
.<List<String>, Collection<List<String>>>transform(source -> source.stream()
.collect(Collectors.collectingAndThen(
Collectors.groupingBy(Function.identity()),
grouped -> grouped.values())))
.split()
.log() // work with messages
.aggregate()
.get();
}
Upvotes: 0
Views: 296
Reputation: 174749
Aggregate them as a single group with the default sequence size release strategy and use a custom output processor (MessageGroupProcessor
) to regroup on payload, returning a Collection<Message<List<?>>>
.
Upvotes: 1