Reputation: 1556
I'm trying to implement the following scenario:
My idea was to read the files in an alphabetical order and wait for 2 seconds after a group was last modified to release it (g.getLastModified()
is smaller than the current time minus 2 seconds)
I've tried the following without success:
return IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
.patternFilter("*.json")
.useWatchService(true)
.watchEvents(FileReadingMessageSource.WatchEventType.CREATE,
FileReadingMessageSource.WatchEventType.MODIFY),
e -> e.poller(Pollers.fixedDelay(100)
.errorChannel("filePollingErrorChannel")))
.enrichHeaders(h -> h.headerExpression("CORRELATION_PATTERN", "headers[" + FileHeaders.FILENAME + "].substring(0,7)")) // docxxxx.length()
.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
.releaseStrategy(g -> g.getLastModified() < System.currentTimeMillis() - 2000)) .channel(MessageChannels.queue("fileReadingResultChannel"))
.get();
Changing the release strategy to the following also didn't work:
.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
.releaseStrategy(g -> {
Stream<Message<?>> stream = g.getMessages()
.stream();
Long timestamp = (Long) stream.skip(stream.count() - 1)
.findFirst()
.get()
.getHeaders()
.get(MessageHeaders.TIMESTAMP);
System.out.println("Timestamp: " + timestamp);
return timestamp.longValue() < System.currentTimeMillis() - 2000;
}))
Am I misunderstanding the release strategy concept?
Also, is it possible to print something out from the releaseStrategy block? I wanted to compare the timestamp (see System.out.println("Timestamp: " + timestamp);
)
Upvotes: 0
Views: 1066
Reputation: 121292
Right, since you don't know the whole sequence for message group, you don't have any other choice unless to use a groupTimeout
. The regular releaseStrategy
works only when a message arrives to the aggregator. Since at the point of one message you don't have enough info to release the group, it is going to sit in the group store forever.
The groupTimeout
option has been introduced to the aggregator especially for this kind of use-cases when we definitely would like to release a group without enough messages to group normally.
You may consider to use a groupTimeoutExpression
instead of constant-based groupTimeout
. The MessageGroup
is a root evaluation context object for SpEL, so you will be able to get access to the mentioned lastModified
for it.
The .sendPartialResultOnExpiry(true)
is right option to deal with here.
See more info in the docs: https://docs.spring.io/spring-integration/reference/html/#agg-and-group-to
Upvotes: 1
Reputation: 1556
I found a solution to that with a different approach. I still don't understand why the above one wasn't working.
I've also found a cleaner way of defining the correlation function.
IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
.patternFilter("*.json")
.useWatchService(true)
.watchEvents(FileReadingMessageSource.WatchEventType.CREATE, FileReadingMessageSource.WatchEventType.MODIFY), e -> e
.poller(Pollers.fixedDelay(100)))
.enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
.getHeaders()
.get(FileHeaders.FILENAME)).substring(0, 17)))
.aggregate(a -> a.groupTimeout(2000)
.sendPartialResultOnExpiry(true))
.channel(MessageChannels.queue("fileReadingResultChannel"))
.get();
Upvotes: 0