wolfsblu
wolfsblu

Reputation: 193

How to do multithreaded file processing in Reactor

I am trying to process multiple files in parallel using Reactor's Flux. The main workload happens within a call to flatMap and afterwards the Flux is transformed and filtered.

Whenever I try to subscribe to the resulting Flux the main thread exits before I receive any values.

Flux.fromStream(Files.list(Paths.get("directory"))
    .flatMap(path -> { 
        return Flux.create(sink -> {
            try (
                RandomAccessFile file = new RandomAccessFile(new File(path), "r");
                FileChannel fileChannel = file.getChannel()
            ) {
                // Process file into tokens
                sink.next(new Token(".."));
            } catch (IOException e) {
                sink.error(e);
            } finally {
                sink.complete();
            }
        }).subscribeOn(Schedulers.boundedElastic());
    })
    .map(token -> /* Transform tokens */)
    .filter(token -> /* Filter tokens*/)
    .subscribe(token -> /* Store tokens in list */)

I'd expect to find the output of the processing pipeline in my list, but the program immediately exits. First of all I'm wondering whether I'm using the Flux class correctly and secondly how would I wait for the subscribe call to finish?

Upvotes: 3

Views: 2785

Answers (1)

Michael Berry
Michael Berry

Reputation: 72254

I'd expect to find the output of the processing pipeline in my list, but the program immediately exits.

The code you have there sets up your reactive chain on the main thread, and then does... nothing else on the main thread. The main thread is therefore done with its work, and since the boundedElastic() threads are daemon threads, there's no other threads stopping the program from quitting, so it quits.

You can see the same behaviour with a much simpler example:

Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)
            .delayElements(Duration.ofMillis(500));
f.subscribe(System.out::println);

You could of course call newBoundedElastic("name", false) to make it a non-daemon backed scheduler, but then you'd have to keep track of it and call dispose when you were done, so it really just inverts the problem (the program runs infinitely until you dispose the scheduler.)

The quick 'n' dirty solution is just to block on the last element of the Flux as the final line in your program - so if we add:

f.blockLast();

...then the program waits for the last element to be emitted before exiting, and we have the behaviour we're after.

For a simple proof of concept, this is fine. It's not ideal in "production" code however. Firstly, "no blocking" is the general rule in reactive code, so if you have blocking calls like this it's difficult to work out if it's intended or not. If you added other chains and also wanted them to finish, you'd have to add blocking calls for each and every one. That's messy, and not really sustainable.

A nicer solution would be to use a CountDownLatch:

CountDownLatch cdl = new CountDownLatch(1);

Flux.just(1, 2, 3, 4, 5)
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> cdl.countDown())
        .subscribe(System.out::println);

cdl.await();

This has the advantage of not explicitly blocking, and also being able to handle more than one publisher at once (if you set the initial value higher than 1.) This also tends to be the approach I see recommended generally for this sort of thing - so if you want the most widely accepted solution, that's probably it.

However, I tend to favour a Phaser for all examples where you need to wait for multiple publishers as oppose to just one - it works similarly to CountdownLatch but can dynamically register() as well as deregister(). This means you can create a single phaser then register multiple publishers to it easily if required without changing the initial value, eg:

Phaser phaser = new Phaser(1);

Flux.just(1, 2, 3, 4, 5)
        .doOnSubscribe(s -> phaser.register())
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> phaser.arriveAndDeregister())
        .subscribe(System.out::println);

Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
        .doOnSubscribe(s -> phaser.register())
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> phaser.arriveAndDeregister())
        .subscribe(System.out::println);

phaser.arriveAndAwaitAdvance();

(You can of course wrap the onSubscribe and doFinally logic in a separate method too if required.)

Upvotes: 6

Related Questions