Xogaz
Xogaz

Reputation: 163

Flux on a dynamic collection

Ok, I'm a bit puzzled how I am supposed to use the Reactor pattern with Spring's Webflux/Reactor API.

Let's say I have on directory where I continuously add files. Whenever a new file appears, my application is supposed to work on it. If the queue is full, new files should be ignored (so no FileWatcher), until there's space in the queue.

The following is just pseudo code, I am trying to understand the general idea, not the specific implementation details.

Class Directory Observer

Manages a scheduled task which checks every 2 seconds if new files appeared. If yes, we try to add them to a queue.

@PostConstruct
public void initialize() {

    this.flux = Flux.generate(consumer -> {
        consumer.next(this.deque.pop()); 
    });

    this.flux.log();
}

@Scheduled(fixedRate = 2000)
public void checkDirectory() {
   // list files, for each add to deque (if not present)
}

public Flux<Path> getObserver() {
    return this.flux; 
}

Class FileProcessor

Consumes the items in the queue one by one.

@PostConstruct
private void subscribe() {
    this.observeDirectorytask.getObserver().subscribe(path ->  {
        log.info("Processing '{}'", path.getFileName());

        // process file, delete it once done
    }); 
}

Does this approach make sense at all? If yes, what do I need to do that my subscription is fired whenever new items are added to the queue (for now this is only executed once when started).

Update

Here is my working implementation:

public class DirectoryObserverTask {

@Autowired
private Path observedDirectory;

private Consumer<Path> newFilesConsumer;
private Consumer<Throwable> errorsConsumer;

private Flux<Path> flux; 


@PostConstruct
public void init() {
    this.observedDirectory = Paths.get(importDirectoryProperty);
}

public void subscribe(Consumer<Path> consumer) {
    if(this.flux == null) {
        this.flux = Flux.push(sink -> {
            this.onError(err -> sink.error(err));
            this.onNewFile(file -> sink.next(file));
        }); 
        this.flux = this.flux.onBackpressureBuffer(10,  BufferOverflowStrategy.DROP_LATEST); 
    }

    this.flux.subscribe(consumer); 

}


@Scheduled(fixedRate = 2000)
public void checkDirectoryContent() throws IOException {    
    Files.newDirectoryStream(this.observedDirectory).forEach(path -> {
        this.newFilesConsumer.accept(path);
    });
}

public void onNewFile(Consumer<Path> newFilesConsumer) {
    this.newFilesConsumer = newFilesConsumer;
}

public void onError(Consumer<Throwable> errorsConsumer) {
    this.errorsConsumer = errorsConsumer;
}

}

And the consumer

@Autowired
private DirectoryObserverTask observeDirectorytask;

@PostConstruct
private void init() {
    observeDirectorytask.subscribe(path -> {
        this.processPath(path);
    });
}

public void processPath(Path t) {
    Mono.justOrEmpty(t)
        .subscribe(path -> {
            // handle the file 
            path.toFile().delete();
        });
}

Upvotes: 4

Views: 7171

Answers (1)

ESala
ESala

Reputation: 7058

You don't need to use a queue, this behavior is already built-in.

I would do something like this:

  1. Use a file watcher do detect changes.

  2. Push changes to a Flux<File>.

  3. As requested, limit the amount of queued events (using backpressure):

    filesFlux.onBackPressureBuffer(10, BufferOverflowStrategy.DROP_LATEST)

  4. Subscribe as usual.

A poor explanation of backpressure would be: "what to do when we can't process elements fast enough".

In this case, we buffer the elements up to 10 and then drop the newest elements while the buffer is full.

Update: there are many ways of creating a Flux. In this particular case I would take a look at the create or push methods (see the documentation).

Example: Imagine you have a FileWatchService where you can register a callback for when a new file is detected and a callback for when there is an error. You could do something like this:

FileWatchService watcher = ...

Flux<File> fileFlux = Flux.push(sink -> {
    watcher.onError(err -> sink.error(err));
    watcher.onNewFile(file -> sink.next(file));
});

fileFlux
    .onBackPressureBuffer(10, BufferOverflowStrategy.DROP_LATEST)
    .subscribe(...)

Upvotes: 3

Related Questions