Reputation: 433
I create an Observable from a long running operation + callback like this:
public Observable<API> login(){
return Observable.create(new Observable.OnSubscribe<API>() {
@Override
public void call(final Subscriber<? super API> subscriber) {
API.login(new SimpleLoginListener() {
@Override
public void onLoginSuccess(String token) {
subscriber.onNext(API.from(token));
subscriber.onCompleted();
}
@Override
public void onLoginFailed(String reason) {
subscriber.onNext(API.error());
subscriber.onCompleted();
}
});
}
})
}
A successfully logged-in api is the pre-condition for multiple other operations like api.getX()
, api.getY()
so I thought I could chain these operation with RxJava and flatMap
like this (simplified): login().getX()
or login().getY()
.
My biggest problem is now, that I don't have control over when login(callback)
is executed. However I want to be able to reuse the login result for all calls.
This means: the wrapped login(callback)
call should be executed only once. The result should then be used for all following calls.
It seems the result would be similar to a queue that aggregates subscribers and then shares the result of the first execution.
What is the best way to achieve this? Am I missing a simpler alternative?
I tried code from this question and experiemented with cache(), share(), publish(), refCount()
etc. but the wrapped function is called 3x when I do this for all of the mentioned operators:
apiWrapper.getX();
apiWrapper.getX();
apiWrapper.getY();
Is there something like autoConnect(time window)
that aggregates multiple successive subscribers?
Upvotes: 0
Views: 996
Reputation: 69997
Applying cache()
should make sure login is only called once.
public Observable<API> login() {
return Observable.create(s -> {
API.login(new SimpleLoginListener() {
@Override
public void onLoginSuccess(String token) {
s.setProducer(new SingleProducer<>(s, API.from(token)));
}
@Override
public void onLoginFailed(String reason) {
s.setProducer(new SingleProducer<>(s, API.error()));
}
});
}).cache();
}
If, for some reason you want to "clear" the cache, you can do the following trick:
AtomicReference<Observable<API>> loginCache = new AtomicReference<>(login());
public Observable<API> cachedLogin() {
return Observable.defer(() -> loginCache.get());
}
public void clearLoginCache() {
loginCache.set(login());
}
Upvotes: 1
Reputation: 433
Ok I think I found one major problem in my approach:
Observable.create()
is a factory method so even if every single observable was working as intented, I created many of them. One way to avoid this mistake is to create a single instance:
if(instance==null){ instance = Observable.create(...) }
return instance
Upvotes: 0