grAPPfruit
grAPPfruit

Reputation: 2138

RxJava 2 - How to chain asyncronous calls

I've just recently started learning RxJava so don't crucify me if I'm asking a newbie question but I've spend days now in trying to figure this out without any success. I've read almost all the documentation I could find and I followed along most of the tutorials on http://reactivex.io/tutorials.html. I searched StackOverflow and the rest of the Internet high and low but apparently I seem to be the only person on the planet with this problem. Which is odd since essentially it boils down to something every piece of software has to do: Logging in a user.

All the tutorials I found are on applying some functions on a stream creating a new stream which is useful and awesome, don't get me wrong but not really helpful in my case. It also got me thinking ... maybe I go about this all wrong. But I'm stuck so deep now and also following the mantra that "everything is a stream" why should that not be possible?

So here is what I'm trying to do:

  1. indicate some form of loading
  2. call a Completable to perform a login action on some server
  3. call a Single to perform a create user action on some server which returns a user id for local reference
  4. use the result of the Single call in the next action and hide loading

Even though I will end up with this on Android I created a basic Java 8 example to outline what I would like to achieve.

This is what I came up with so far:

Notes:

Here a runnable version of the code:

public static void main(final String[] args) {
    getMainStream()
        .doOnNext(__ -> showLoading())
        .flatMap(__ -> loginUser().toObservable())
        .flatMap(__ -> createUser().toObservable())
        .doOnNext(userId -> {
            hideLoading();
            System.out.println("userId: " + userId);
        })
        .subscribe();
}

public static Completable loginUser() {
    return Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(final CompletableEmitter e) throws Exception {
            Thread.sleep(500);
            System.out.println("loginUser");
            e.onComplete();
        }
    });
}

public static Single<String> createUser() {
    return Single.<String>create(new SingleOnSubscribe<String>() {
        @Override
        public void subscribe(final SingleEmitter<String> e) throws Exception {
            Thread.sleep(1000);
            System.out.println("createUser");
            e.onSuccess("some_user_id");
        }
    });
}

public static Completable getCompletable(final String input) {
    return Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(final CompletableEmitter e) throws Exception {
            Thread.sleep(750);
            System.out.println("completable, input=" + input);
            e.onComplete();
        }
    });
}

public static Observable<Object> getMainStream() {
    return Observable.just(new Object());
}

private static void hideLoading() {
    System.out.println("hideLoading()");
}

private static void showLoading() {
    System.out.println("showLoading()");
}

The console output of this is:

showLoading()
loginUser

Unfortunately, the login user never returns?!

I'm really looking forward to any help on this topic!

Thanks!!!

Upvotes: 0

Views: 1072

Answers (1)

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

loginUser() is a Completable and when you convert the Completable to Observable, the effect is that the observable will complete. Hence, there is no downstream values to be acted upon by createUser().

You might consider changing the expression to loginUser().andThen( () -> createUser().toObservable(), which results in emitting the string.

Upvotes: 1

Related Questions