Reputation: 79
I am trying to implement LastModifiedFileListFilter
as it looks like there is no similar filter for spring-integration-sftp yet for 5.3.2 release, I tried to copy the LastModifiedFileListFilter
from spring-integration-file but the discard callback isn't working. Here is my implementation:
@Slf4j
@Data
public class LastModifiedLsEntryFileListFilter implements DiscardAwareFileListFilter<ChannelSftp.LsEntry> {
private static final long ONE_SECOND = 1000;
private static final long DEFAULT_AGE = 30;
private volatile long age = DEFAULT_AGE;
@Nullable
private Consumer<ChannelSftp.LsEntry> discardCallback;
public LastModifiedLsEntryFileListFilter(final long age) {
this.age = age;
}
@Override
public List<ChannelSftp.LsEntry> filterFiles(final ChannelSftp.LsEntry[] files) {
final List<ChannelSftp.LsEntry> list = new ArrayList<>();
final long now = System.currentTimeMillis() / ONE_SECOND;
for (final ChannelSftp.LsEntry file : files) {
if (this.fileIsAged(file, now)) {
log.info("File [{}] is aged...", file.getFilename());
list.add(file);
} else if (this.discardCallback != null) {
log.info("File [{}] is still being uploaded...", file.getFilename());
this.discardCallback.accept(file);
}
}
return list;
}
@Override
public boolean accept(final ChannelSftp.LsEntry file) {
if (this.fileIsAged(file, System.currentTimeMillis() / ONE_SECOND)) {
return true;
}
else if (this.discardCallback != null) {
this.discardCallback.accept(file);
}
return false;
}
private boolean fileIsAged(final ChannelSftp.LsEntry file, final long now) {
return file.getAttrs().getMTime() + this.age <= now;
}
@Override
public void addDiscardCallback(@Nullable final Consumer<ChannelSftp.LsEntry> discardCallbackToSet) {
this.discardCallback = discardCallbackToSet;
}
}
The filter is able to correctly identify the age of file and discards it but that file is not retried which I believe is part of discard callback.
I guess my question is how to set discard callback to keep retrying the discarded file until files ages
. Thanks
Upvotes: 2
Views: 395
Reputation: 121550
not retried which I believe is part of discard callback.
I wonder what makes you think that way...
The fact that FileReadingMessageSource
with its WatchService
option has the logic like this:
if (filter instanceof DiscardAwareFileListFilter) {
((DiscardAwareFileListFilter<File>) filter).addDiscardCallback(this.filesToPoll::add);
}
doesn't mean that SFTP implementation is similar.
The retry is there anyway: on the next poll not accepted file will be checked again.
You probably don't show other filters you use, and your file is filtered out before this LastModifiedLsEntryFileListFilter
, e.g. with the SftpPersistentAcceptOnceFileListFilter
. You need to consider to have your "last-modified" as a first one in the chain.
If you are not going to support discard callback from the outside, you probably don't need to implement that DiscardAwareFileListFilter
at all.
Upvotes: 1