Philipp
Philipp

Reputation: 433

RxJava one observable, multiple subscribers, one execution

I create an Observable from a long running operation + callback like this:

public Observable<API> login(){
    return Observable.create(new Observable.OnSubscribe<API>() {
        @Override
        public void call(final Subscriber<? super API> subscriber) {
            API.login(new SimpleLoginListener() {
                @Override
                public void onLoginSuccess(String token) {
                    subscriber.onNext(API.from(token));
                    subscriber.onCompleted();
                }
                @Override
                public void onLoginFailed(String reason) {
                    subscriber.onNext(API.error());
                    subscriber.onCompleted();
                }
            });
        }
    })
}

A successfully logged-in api is the pre-condition for multiple other operations like api.getX(), api.getY() so I thought I could chain these operation with RxJava and flatMap like this (simplified): login().getX() or login().getY().

My biggest problem is now, that I don't have control over when login(callback) is executed. However I want to be able to reuse the login result for all calls.

This means: the wrapped login(callback) call should be executed only once. The result should then be used for all following calls.

It seems the result would be similar to a queue that aggregates subscribers and then shares the result of the first execution.

What is the best way to achieve this? Am I missing a simpler alternative?

I tried code from this question and experiemented with cache(), share(), publish(), refCount() etc. but the wrapped function is called 3x when I do this for all of the mentioned operators:

apiWrapper.getX();
apiWrapper.getX();
apiWrapper.getY();

Is there something like autoConnect(time window) that aggregates multiple successive subscribers?

Upvotes: 0

Views: 996

Answers (2)

akarnokd
akarnokd

Reputation: 69997

Applying cache() should make sure login is only called once.

public Observable<API> login() {
    return Observable.create(s -> {
        API.login(new SimpleLoginListener() {
            @Override
            public void onLoginSuccess(String token) {
                s.setProducer(new SingleProducer<>(s, API.from(token)));
            }
            @Override
            public void onLoginFailed(String reason) {
                s.setProducer(new SingleProducer<>(s, API.error()));
            }
        });
   }).cache();
}

If, for some reason you want to "clear" the cache, you can do the following trick:

AtomicReference<Observable<API>> loginCache = new AtomicReference<>(login());

public Observable<API> cachedLogin() {
    return Observable.defer(() -> loginCache.get());
}

public void clearLoginCache() {
    loginCache.set(login());
}

Upvotes: 1

Philipp
Philipp

Reputation: 433

Ok I think I found one major problem in my approach: Observable.create() is a factory method so even if every single observable was working as intented, I created many of them. One way to avoid this mistake is to create a single instance:

if(instance==null){ instance = Observable.create(...) }
return instance

Upvotes: 0

Related Questions