Andrea Baccega
Andrea Baccega

Reputation: 27633

Cache handling with RXJava

I'm trying to implement this workflow with rxJava but i'm sure if i'm misusing or doing stuff wrong.

Here is my full snippet of code.

public class LoginTask extends BaseBackground<LoginResult> {
  private static CachedLoginResult cachedLoginResult = new CachedLoginResult();
  private XMLRPCClient xmlrpcClient;
  private UserCredentialsHolder userCredentialsHolder;

  @Inject
  public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) {
    this.xmlrpcClient = client;
    this.userCredentialsHolder = userCredentialsHolder;
  }

  @Override
  public LoginResult performRequest() throws Exception {
    return UserApi.login(
        xmlrpcClient,
        userCredentialsHolder.getUserName(),
        userCredentialsHolder.getPlainPassword());


  }

  @Override
  public Observable<LoginResult> getObservable() {
    return cachedLoginResult.getObservable()
        .onErrorResumeNext(
            Observable.create(
                ((Observable.OnSubscribe<LoginResult>) subscriber -> {
                  try {
                    if (!subscriber.isUnsubscribed()) {
                      subscriber.onNext(performRequest()); // actually performRequest
                    }
                    subscriber.onCompleted();
                  } catch (Exception e) {
                    subscriber.onError(e);
                  }
                })
            )
                .doOnNext(cachedLoginResult::setLoginResult)
                .retry((attempts, t) -> attempts < 3)
                .doOnError(throwable -> cachedLoginResult.purgeCache())
        );
  }


  private static class CachedLoginResult {
    private LoginResult lr = null;
    private long when = 0;

    private CachedLoginResult() {
    }

    public boolean hasCache() {
      return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis();
    }

    public void setLoginResult(LoginResult lr) {
      if (lr != null) {
          this.lr = lr;
          this.when = System.currentTimeMillis();
      }
    }

    public void purgeCache() {
      this.lr = null;
      this.when = 0;
    }

    public Observable<LoginResult> getObservable() {
      return Observable.create(new Observable.OnSubscribe<LoginResult>() {
        @Override
        public void call(Subscriber<? super LoginResult> subscriber) {
          if (!subscriber.isUnsubscribed()) {
            if (hasCache()) {
              subscriber.onNext(lr);
              subscriber.onCompleted();
            } else {
              subscriber.onError(new RuntimeException("No cache"));
            }
          }
        }
      });
    }
  }
}

Since i wan't able to find any similar examples and i started "playing" with rxjava just 1 day ago i'm unsure of my implementation.

Thank you for your time.

Upvotes: 5

Views: 1664

Answers (2)

akarnokd
akarnokd

Reputation: 70017

If I understand correctly, you want to perform the login once and cache the result in a reactive manner? If so, here is an example how I would do this:

import java.util.concurrent.ThreadLocalRandom;

import rx.*;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;


public class CachingLogin {
    static class LoginResult {

    }
    /** Guarded by this. */
    AsyncSubject<LoginResult> cache;
    public Observable<LoginResult> login(String username, String password) {
        AsyncSubject<LoginResult> c;
        boolean doLogin = false;
        synchronized (this) {
            if (cache == null || cache.hasThrowable()) {
                cache = AsyncSubject.create();
                doLogin = true;
            }
            c = cache;
        }
        if (doLogin) {
            Observable.just(1).subscribeOn(Schedulers.io())
            .map(v -> loginAPI(username, password))
            .retry(3).subscribe(c);
        }
        return c;
    }
    public void purgeCache() {
        synchronized (this) {
            cache = null;
        }
    }
    static LoginResult loginAPI(String username, String password) {
        if (ThreadLocalRandom.current().nextDouble() < 0.3) {
            throw new RuntimeException("Failed");
        }
        return new LoginResult();
    }
}

Upvotes: 0

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28351

I think this code is alright, good job :)

You were right to use Observable.create in your LoginTask because otherwise result of the call could be cached internally, and then retry wouldn't help much...

This is I think however unnecessary for the CachedLoginResult's Observable. Here you can simplify your code by using Observable.justand Observable.error utility methods, something like:

public Observable<LoginResult> getObservable() {
  if (hasCache()) {
      return Observable.just(lr);
  } else {
      return Observable.error(new RuntimeException("No cache"));
  }
}

Note: just stores the value you tell it to emit internally, so that resubscriptions will always produce this value. This is what I hinted above, you shouldn't do Observable.just(performRequest()).retry(3) for example, because the performRequest will only ever be called once.

Upvotes: 4

Related Questions