Reputation: 327
So I am trying to cache my http responses into a ConcurrentHashMap. I have set up my cache class and Api client class to return Observables as follows:
public class UserCache {
private static ConcurrentHashMap<Integer, User> cache = new ConcurrentHashMap<>();
public Observable<User> get(Integer key) {
return Observable.create(observableEmitter -> {
if(cache.contains(key)) observableEmitter.onNext(cache.get(key));
observableEmitter.onComplete();
});
}
public void update(Integer key, User user) {
cache.putIfAbsent(key, user);
}
public boolean contains(Integer key) {
return cache.contains(key);
}
}
ApiClient
public class ApiClient {
private UserApi api;
private static ApiClient apiClient;
private ApiClient() {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(interceptor).build();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://jsonplaceholder.typicode.com")
.client(client)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
api = retrofit.create(UserApi.class);
}
public Observable<User> get(int id) {
return api.getUser(id);
}
public static ApiClient getInstance() {
if(apiClient == null) apiClient = new ApiClient();
return apiClient;
}
}
And in the App class
public class App {
ApiClient apiSource = ApiClient.getInstance();
UserCache userCache = new UserCache();
public Observable<User> get(Integer key) {
return Observable.concat(userCache.get(key), apiSource.get(key))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.doOnNext(user -> {
userCache.update(user.id, user);
})
.subscribeOn(Schedulers.io());
}
public static void main(String[] args) throws InterruptedException {
App app = new App();
app.get(1).subscribe(System.out::println);
Thread.sleep(3000);
app.get(1).subscribe(System.out::println);
Thread.sleep(1000);
}
}
Mmy understanding of .concat
is that if the first observable (cache) doesn't emit anything, then the second Observable (api client) will start emitting. But I can't figure out why doOnNext(user -> userCache.update(user.id, user))
is not updating the cache, and hence when i retrieve the same key, another api call is carried out again.
Upvotes: 0
Views: 765
Reputation: 3013
I'm not sure why your doOnNext
does not emit, but if you're using RX there's a way that requires less code and eliminates race conditions. In the example you give, if we call the method twice on an empty cache, it will make two network calls and the last one will overwrite the first. Here's my preferred method, which prevents this from happening and requires less code:
private final ConcurrentMap<Integer, Observable<User>> userCache = Maps.newConcurrentMap();
public Observable<User> getUser(int id) {
return userCache.computeIfAbsent(id, theId -> {
ConnectableObservable<User> cachedObservable = getUserFromApi(id)
.replay();
cachedObservable.connect();
return cachedObservable;
}).doOnError(err -> userCache.remove(id));
}
As you can see I store cached observables. That way, if a second call is made while the first is still in flight, they get the same result, and it is only cached once. All calls after that get it directly from the cached observable.
However, we don't want to cache errors (probably) so I append a doOnError
which makes sure that any observables that contain an error (like a network failure) are not cached as well.
Upvotes: 1