Reputation: 21
I am reading root directory in FileReadingMessageSource from Spring Integration to retrieve ongoing file creations. Scenario is that there could be multiple sub-directories under root directory ongoing basis. WatchServiceDirectoryScanner from SI 4.3.1 is used to pick up any files created in any new sub-directory.
@Bean
public MessageSource<File> fileReadingMessageSource() {
CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
filters.addFilter(new SimplePatternFileListFilter("pattern*"));
//filters.addFilter(new LastModifiedFileListFilter());
FileReadingMessageSource fileSource = new FileReadingMessageSource();
String filePath = "root-directory";
fileSource.setDirectory(new File(filePath));
fileSource.setFilter(filters);
fileSource.setUseWatchService(true);
fileSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE,FileReadingMessageSource.WatchEventType.MODIFY,FileReadingMessageSource.WatchEventType.DELETE);
return fileSource;
}
@Bean
public IntegrationFlow readDirectoryFlow() {
return IntegrationFlows.from(
fileReadingMessageSource(),
e -> e.poller(Pollers.cron("*/5 * * * * *")))
.channel(fileInputChannel())
.handle(tailerRestart)
.handle(System.out::println)
.get();
}
On first poll, all files matching pattern are available via Message Resource but if any new files created later on in any new sub-directory then Message Resource is not able to pick new pattern matching files.
I see following DEBUG message in log
DEBUG SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
What could be missing ?
Upvotes: 2
Views: 828
Reputation: 121542
I've just wrote some test-case very close to your code:
@Bean
public MessageSource<File> fileReadingMessageSource() {
CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
filters.addFilter(new SimplePatternFileListFilter("*.watch"));
FileReadingMessageSource fileSource = new FileReadingMessageSource();
fileSource.setDirectory(tmpDir.getRoot());
fileSource.setFilter(filters);
fileSource.setUseWatchService(true);
fileSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE,
FileReadingMessageSource.WatchEventType.MODIFY,
FileReadingMessageSource.WatchEventType.DELETE);
return fileSource;
}
@Bean
public IntegrationFlow readDirectoryFlow() {
return IntegrationFlows
.from(fileReadingMessageSource(),
e -> e.poller(p -> p.cron("*/1 * * * * *")))
.handle(System.out::println)
.get();
}
The test code looks like:
@ClassRule
public static final TemporaryFolder tmpDir = new TemporaryFolder();
@Test
public void testWatchServiceMessageSource() throws Exception {
File newFolder1 = tmpDir.newFolder();
FileOutputStream file = new FileOutputStream(new File(newFolder1, "foo.watch"));
file.write(("foo").getBytes());
file.flush();
file.close();
File newFolder2 = tmpDir.newFolder();
file = new FileOutputStream(new File(newFolder2, "bar.watch"));
file.write(("bar").getBytes());
file.flush();
file.close();
file = new FileOutputStream(new File(tmpDir.getRoot(), "root.watch"));
file.write(("root").getBytes());
file.flush();
file.close();
Thread.sleep(10000);
}
And I have this logs:
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\junit7776799219532481336\foo.watch, headers={id=50d44197-e0af-708a-6b61-2a2cfeec68da, timestamp=1473686655061}]
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\junit813088196038861528\bar.watch, headers={id=8d80c853-19b6-f667-7950-d6de49d509ab, timestamp=1473686656062}]
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\root.watch, headers={id=e585203b-41dc-cadb-6a36-4c9009a34701, timestamp=1473686657063}]
each second.
Not sure where is your problem...
You don't need .channel(fileInputChannel())
. It will be created automatically in between ednpoints.
with the config:
.handle(tailerRestart)
.handle(System.out::println)
you should be sure that tailerRestart
returns something. Although, according to our other discussion, it doesn't:
@ServiceActivator
public void restartTailer(File input) throws Exception {
tailFileProducer.stop();
tailFileProducer.setFile(input);
tailFileProducer.start();
}
UPDATE
After some private investigation we figured out that the issue is with the FileReadingMessageSource.start()
called several times by the Spring Cloud Stream infrastructure, causing re-instantiating internal WatchService
object.
The FileReadingMessageSource.start()
has to be fixed to be idempotent: https://jira.spring.io/browse/INT-4108
The Spring Cloud Stream has been fixed in version 1.1
: https://github.com/spring-cloud/spring-cloud-stream/issues/525.
The workaround is like ensure that FileReadingMessageSource.start()
is called only once:
FileReadingMessageSource fileSource = new FileReadingMessageSource() {
private final AtomicBoolean running = new AtomicBoolean();
@Override
public void start() {
if (!this.running.getAndSet(true)) {
super.start();
}
}
@Override
public void stop() {
if (this.running.getAndSet(false)) {
super.stop();
}
}
};
Upvotes: 2