Reputation: 2138
I've just recently started learning RxJava so don't crucify me if I'm asking a newbie question but I've spend days now in trying to figure this out without any success. I've read almost all the documentation I could find and I followed along most of the tutorials on http://reactivex.io/tutorials.html. I searched StackOverflow and the rest of the Internet high and low but apparently I seem to be the only person on the planet with this problem. Which is odd since essentially it boils down to something every piece of software has to do: Logging in a user.
All the tutorials I found are on applying some functions on a stream creating a new stream which is useful and awesome, don't get me wrong but not really helpful in my case. It also got me thinking ... maybe I go about this all wrong. But I'm stuck so deep now and also following the mantra that "everything is a stream" why should that not be possible?
So here is what I'm trying to do:
Completable
to perform a login action on some serverSingle
to perform a create user action on some server which returns a user id for local referenceSingle
call in the next action and hide loadingEven though I will end up with this on Android I created a basic Java 8 example to outline what I would like to achieve.
This is what I came up with so far:
Notes:
getMainStream()
function is there to simulate some interactionHere a runnable version of the code:
public static void main(final String[] args) {
getMainStream()
.doOnNext(__ -> showLoading())
.flatMap(__ -> loginUser().toObservable())
.flatMap(__ -> createUser().toObservable())
.doOnNext(userId -> {
hideLoading();
System.out.println("userId: " + userId);
})
.subscribe();
}
public static Completable loginUser() {
return Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter e) throws Exception {
Thread.sleep(500);
System.out.println("loginUser");
e.onComplete();
}
});
}
public static Single<String> createUser() {
return Single.<String>create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(final SingleEmitter<String> e) throws Exception {
Thread.sleep(1000);
System.out.println("createUser");
e.onSuccess("some_user_id");
}
});
}
public static Completable getCompletable(final String input) {
return Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter e) throws Exception {
Thread.sleep(750);
System.out.println("completable, input=" + input);
e.onComplete();
}
});
}
public static Observable<Object> getMainStream() {
return Observable.just(new Object());
}
private static void hideLoading() {
System.out.println("hideLoading()");
}
private static void showLoading() {
System.out.println("showLoading()");
}
The console output of this is:
showLoading()
loginUser
Unfortunately, the login user never returns?!
I'm really looking forward to any help on this topic!
Thanks!!!
Upvotes: 0
Views: 1072
Reputation: 8227
loginUser()
is a Completable
and when you convert the Completable
to Observable
, the effect is that the observable will complete. Hence, there is no downstream values to be acted upon by createUser()
.
You might consider changing the expression to loginUser().andThen( () -> createUser().toObservable()
, which results in emitting the string.
Upvotes: 1