Blink
Blink

Reputation: 1556

Spring Integration aggregator's release strategy based on last modified

I'm trying to implement the following scenario:

  1. I get a bunch of files that have common file pattern, i.e. doc0001_page0001, doc0001_page0002, doc0001_page0003, doc0002_page0001 (where doc0001 would be one document consisting of 3 pages that I would need to merge, doc0002 would only have 1 page)
  2. I want to aggregate them in a way that I will release a group only if all of the files for specific document are gathered (doc0001 after 3 files were picked up, doc0002 after 1 file)

My idea was to read the files in an alphabetical order and wait for 2 seconds after a group was last modified to release it (g.getLastModified() is smaller than the current time minus 2 seconds)

I've tried the following without success:

return IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
                                  .patternFilter("*.json")
                                  .useWatchService(true)
                                  .watchEvents(FileReadingMessageSource.WatchEventType.CREATE,
                                          FileReadingMessageSource.WatchEventType.MODIFY),
        e -> e.poller(Pollers.fixedDelay(100)
                             .errorChannel("filePollingErrorChannel")))
                       .enrichHeaders(h -> h.headerExpression("CORRELATION_PATTERN", "headers[" + FileHeaders.FILENAME + "].substring(0,7)")) // docxxxx.length()
                       .aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
                                        .releaseStrategy(g -> g.getLastModified() < System.currentTimeMillis() - 2000))                       .channel(MessageChannels.queue("fileReadingResultChannel"))
                       .get();

Changing the release strategy to the following also didn't work:

.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
                .releaseStrategy(g -> {
                    Stream<Message<?>> stream = g.getMessages()
                                                 .stream();
                    Long timestamp = (Long) stream.skip(stream.count() - 1)
                                                  .findFirst()
                                                  .get()
                                                  .getHeaders()
                                                  .get(MessageHeaders.TIMESTAMP);
                    System.out.println("Timestamp: " + timestamp);
                    return timestamp.longValue() < System.currentTimeMillis() - 2000;

                }))

Am I misunderstanding the release strategy concept?

Also, is it possible to print something out from the releaseStrategy block? I wanted to compare the timestamp (see System.out.println("Timestamp: " + timestamp);)

Upvotes: 0

Views: 1066

Answers (2)

Artem Bilan
Artem Bilan

Reputation: 121292

Right, since you don't know the whole sequence for message group, you don't have any other choice unless to use a groupTimeout. The regular releaseStrategy works only when a message arrives to the aggregator. Since at the point of one message you don't have enough info to release the group, it is going to sit in the group store forever.

The groupTimeout option has been introduced to the aggregator especially for this kind of use-cases when we definitely would like to release a group without enough messages to group normally.

You may consider to use a groupTimeoutExpression instead of constant-based groupTimeout. The MessageGroup is a root evaluation context object for SpEL, so you will be able to get access to the mentioned lastModified for it.

The .sendPartialResultOnExpiry(true) is right option to deal with here.

See more info in the docs: https://docs.spring.io/spring-integration/reference/html/#agg-and-group-to

Upvotes: 1

Blink
Blink

Reputation: 1556

I found a solution to that with a different approach. I still don't understand why the above one wasn't working.

I've also found a cleaner way of defining the correlation function.

IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
                                  .patternFilter("*.json")
                                  .useWatchService(true)
                                  .watchEvents(FileReadingMessageSource.WatchEventType.CREATE, FileReadingMessageSource.WatchEventType.MODIFY), e -> e
        .poller(Pollers.fixedDelay(100)))
                       .enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
                               .getHeaders()
                               .get(FileHeaders.FILENAME)).substring(0, 17)))
                       .aggregate(a -> a.groupTimeout(2000)
                                        .sendPartialResultOnExpiry(true))
                       .channel(MessageChannels.queue("fileReadingResultChannel"))
                       .get();

Upvotes: 0

Related Questions