E-Riz
E-Riz

Reputation: 32994

How to use WatchServiceDirectoryScanner with Spring Cloud Stream file supplier?

I'm trying to configure SpCS's file supplier to monitor a directory and publish messages when files are created or modified. Based on another SO question, I have the following custom config for the FileInboundChannelAdapterSpec bean:

@Bean
public BeanPostProcessor inboundFileAdaptorCustomizer() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName) {

            if (bean instanceof FileInboundChannelAdapterSpec) {
                FileInboundChannelAdapterSpec spec = (FileInboundChannelAdapterSpec) bean;
                spec.get().setDirectory(properties.getDirectory());
                spec.autoCreateDirectory(false);
                spec.useWatchService(true);
                spec.watchEvents(WatchEventType.CREATE, WatchEventType.MODIFY);
                spec.preventDuplicates(false);  // We use a custom duplicates filter, see below

                spec.nioLocker();

                spec.filter(fileFilters());
            }

            return bean;
        }
    };
}

When I run with this config, I get the following exception:

java.lang.IllegalStateException: The WatchService hasn't been started
    at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.9.jar:5.3.9]
    at org.springframework.integration.file.FileReadingMessageSource$WatchServiceDirectoryScanner.listEligibleFiles(FileReadingMessageSource.java:466) ~[spring-integration-file-5.5.3.jar:5.5.3]
    at org.springframework.integration.file.DefaultDirectoryScanner.listFiles(DefaultDirectoryScanner.java:90) ~[spring-integration-file-5.5.3.jar:5.5.3]
    at org.springframework.integration.file.FileReadingMessageSource.scanInputDirectory(FileReadingMessageSource.java:375) ~[spring-integration-file-5.5.3.jar:5.5.3]
    at org.springframework.integration.file.FileReadingMessageSource.doReceive(FileReadingMessageSource.java:349) ~[spring-integration-file-5.5.3.jar:5.5.3]

Using breakpoints I've noticed that the start() method of the WatchServiceDirectoryScanner is never called. Instead, doReceived() and hence listElibibleFiles() is called before it's ever started, which is where the exception is coming from.

Why is SI telling the scanner to list files before it's even been started?

Upvotes: 3

Views: 208

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

This is a bug in the FileSupplierConfiguration - the call to start() is missing. The fix is already merged, but in the meantime, the workaround is like this:

@Bean
public SmartLifecycle fileReadingStart(FileReadingMessageSource fileMessageSource) {
    return new SmartLifecycle() {

        @Override
        public int getPhase() {
            // Make sure this runs as early as possible
            return Integer.MIN_VALUE;
        }

        @Override 
        public void start() {
            fileMessageSource.start();
        }

        @Override 
        public void stop() {
            fileMessageSource.stop();
        }

        @Override 
        public boolean isRunning() {
            return false;
        }
        
    };
}

Upvotes: 2

Related Questions