Reputation: 47
I have a scenario where i don't want to use default release strategy for aggregator . I want to create customised aggregator release strategy .
UPDATE : Updated Integration flow :
@Bean
public IntegrationFlow fileIntegrationFlow(){
return IntegrationFlows.from(readFilefromDirectory(), fileInboundPollingConsumer())
.split(fileSplitter())
.filter(p -> !(p instanceof FileSplitter.FileMarker),
f -> f.discardChannel("passToAggregate"))
.transform(dataDataTransformer())
.filter(fileFilter())
.wireTap(logChannel())
.channel("passToAggregate")
.aggregate(a->a.releaseStrategy(group ->
group.getMessages()
.stream()
.anyMatch(m ->
FileSplitter.FileMarker.Mark.END.name()
.equals(m.getHeaders().get(FileHeaders.MARKER)))))
.wireTap(logChannel())
.<List<Customer>>handle((p, h) -> new MonitoringData(p))
.transform(Transformers.marshaller(getMarshaller(),resultTransformer(),true))
.handle(fileWriter()).get();
}
Now in logs i can see customer objects when i removed correlationstrategy
:
[FileMarker [filePath=D:\example_tex\in\dateeea.txt, mark=START], com.springintegration.example.Customer@470aa1cb, com.springintegration.example.Customer@7c898288, FileMarker [filePath=D:\example_tex\in\dateeea.txt, mark=END, lineCount=6]]
Now i will start to work on output processor to filter data.
Upvotes: 1
Views: 2398
Reputation: 121292
First of all you should be familiar with all the Aggregator options and strategies. For this purpose you should take a look to the Reference Manual.
After that you can just follow with the auto-completion on the AggregatorSpec
configuring your .aggregate()
. The custom releaseStrategy
can be achieved via these methods:
/**
* Configure the handler with an {@link ExpressionEvaluatingReleaseStrategy} for the
* given expression.
* @param releaseExpression the correlation expression.
* @return the handler spec.
* @see AbstractCorrelatingMessageHandler#setReleaseStrategy(ReleaseStrategy)
*/
public S releaseExpression(String releaseExpression) {
...
/**
* Configure the handler with an
* {@link org.springframework.integration.aggregator.MethodInvokingReleaseStrategy}
* for the target object and method name.
* @param target the target object.
* @param methodName the method name.
* @return the handler spec.
* @see AbstractCorrelatingMessageHandler#setReleaseStrategy(ReleaseStrategy)
*/
public S releaseStrategy(Object target, String methodName) {
...
/**
* @param releaseStrategy the release strategy.
* @return the handler spec.
* @see AbstractCorrelatingMessageHandler#setReleaseStrategy(ReleaseStrategy)
*/
public S releaseStrategy(ReleaseStrategy releaseStrategy) {
...
For your use-case with the FileSplitter
and allow to release group when the Mark.END
has arrived, I'd suggest this implementation:
.releaseStrategy(group ->
group.getMessages()
.stream()
.anyMatch(m ->
FileSplitter.FileMarker.Mark.END.name()
.equals(m.getHeaders().get(FileHeaders.MARKER))))
Upvotes: 2