chhil
chhil

Reputation: 460

RxJava2, 2 subscribers for an Observable/Flowable but onNext getting called on any one

rxjava2 version 2.1.5

Trying to understand RxJava2 multiple subscriptions of an observable. Have a simple file watch service that tracks create,modify,delete of files in a directory. I add 2 subscribers and expect events to be printed on both subscribers. When I copy a file into the watched directory, I see one subscriber print out the event. Then, when I delete the file I see the second subscriber print out the event. I was expecting events to be printed by both subscribers. What am I missing here?

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;

import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

public class MyRxJava2DirWatcher {

    public Flowable<WatchEvent<?>> createFlowable(WatchService watcher, Path path) {

        return Flowable.create(subscriber -> {

            boolean error = false;
            WatchKey key;
            try {

                key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
            }
            catch (IOException e) {
                subscriber.onError(e);
                error = true;
            }

            while (!error) {
                key = watcher.take();

                for (final WatchEvent<?> event : key.pollEvents()) {
                    subscriber.onNext(event);
                }

                key.reset();
            }

        }, BackpressureStrategy.BUFFER);

    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Path path = Paths.get("c:\\temp\\delete");
        final FileSystem fileSystem = path.getFileSystem();
        WatchService watcher = fileSystem.newWatchService();

        MyRxJava2DirWatcher my = new MyRxJava2DirWatcher();
        my.createFlowable(watcher, path).subscribeOn(Schedulers.computation()).subscribe(event -> {
            System.out.println("1>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
                    + Thread.currentThread().getName());

        }, onError -> {
            System.out.println("1>>" + Thread.currentThread().getName());
            onError.printStackTrace();
        });

        // MyRxJava2DirWatcher my2 = new MyRxJava2DirWatcher();

        my.createFlowable(watcher, path).subscribeOn(Schedulers.computation()).subscribe(event -> {
            System.out.println("2>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
                    + Thread.currentThread().getName());

        }, onError -> {
            System.out.println("2>>" + Thread.currentThread().getName());
            onError.printStackTrace();
        });

        TimeUnit.MINUTES.sleep(1000);

    }
}

The output looks like the following

2>>Event kind:ENTRY_CREATE. File affected: 1.txt. RxCachedThreadScheduler-2
2>>Event kind:ENTRY_MODIFY. File affected: 1.txt. RxCachedThreadScheduler-2
1>>Event kind:ENTRY_DELETE. File affected: 1.txt. RxCachedThreadScheduler-1

Upvotes: 2

Views: 2069

Answers (2)

solidak
solidak

Reputation: 5081

You are creating two different Flowables for two different subscribers. Have it be one Flowable being subscribed to twice like the following.

public static void main(String[] args) throws IOException, InterruptedException {
        Path path = Paths.get("c:\\temp\\delete");
        final FileSystem fileSystem = path.getFileSystem();
        WatchService watcher = fileSystem.newWatchService();

        MyRxJava2DirWatcher my = new MyRxJava2DirWatcher();
        Flowable myFlowable = my.createFlowable(watcher, path);

        myFlowable.subscribeOn(Schedulers.computation()).subscribe(event -> {
            System.out.println("1>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
                    + Thread.currentThread().getName());

        }, onError -> {
            System.out.println("1>>" + Thread.currentThread().getName());
            onError.printStackTrace();
        });

        myFlowable.subscribeOn(Schedulers.computation()).subscribe(event -> {
            System.out.println("2>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
                    + Thread.currentThread().getName());

        }, onError -> {
            System.out.println("2>>" + Thread.currentThread().getName());
            onError.printStackTrace();
        });

        TimeUnit.MINUTES.sleep(1000);

    }
}

Upvotes: 0

akarnokd
akarnokd

Reputation: 70017

What happens is that you share the same WatchService between two Flowables and they race for events in it. If you pass in the FileSystem instead and call newWatchService() in Flowable.create, you should receive all events as many times as there are Subscribers:

public Flowable<WatchEvent<?>> createFlowable(FileSystem fs, Path path) {

    return Flowable.create(subscriber -> {

        WatchService watcher = fs.newWatchService();

        subscriber.setCancellable(() -> watcher.close());

        boolean error = false;
        WatchKey key;
        try {

            key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
        }
        catch (IOException e) {
            subscriber.onError(e);
            error = true;
        }

        while (!error) {
            key = watcher.take();

            for (final WatchEvent<?> event : key.pollEvents()) {
                subscriber.onNext(event);
            }

            key.reset();
        }

    }, BackpressureStrategy.BUFFER);

}

Note also that you should use subscribeOn(Schedulers.computation(), false) to avoid deadlocking the poll with your Subscriber.

Upvotes: 5

Related Questions