Abdelrhman Talat
Abdelrhman Talat

Reputation: 1317

From RxJava 1 to RxJava 2

I'm trying to convert this RxJava1 code to RxJava2

public static Observable<Path> listFolder(Path dir, String glob) {
    return Observable.<Path>create(subscriber -> {
        try {
            DirectoryStream<Path> stream =
                    Files.newDirectoryStream(dir, glob);

            subscriber.add(Subscriptions.create(() -> {
                try {
                    stream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }));
            Observable.<Path>from(stream).subscribe(subscriber);
        } catch (DirectoryIteratorException ex) {
            subscriber.onError(ex);
        } catch (IOException ioe) {
            subscriber.onError(ioe);
        }
    });
}

The thing is that in Rxjava2 I don't get a subscriber to add a new subscription to it.

Upvotes: 4

Views: 584

Answers (1)

Dave Moten
Dave Moten

Reputation: 12087

Enjoy RxJava 2 conciseness (Flowable is the backpressure supporting class now):

public static Flowable<Path> listFolder(Path dir, String glob) {
    return Flowable.using(
        () -> Files.newDirectoryStream(dir, glob),
        stream -> Flowable.fromIterable(stream),
        stream -> stream.close());
}

If you don't want backpressure then replace Flowable with Observable.

Upvotes: 5

Related Questions