fstephany
fstephany

Reputation: 2354

Use concat() and first() to implement a cache with RxJava - concurrent calls

I'm using RxJava and the concat() and first() operators:

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .first();
}

The cachedEntities returns an Observable built from a cached list while the networkEntities method fetches the entities with Retrofit.

This works great unless two user subscribes quickly to the observables returned by getEntities(). I guess the network request of the first subscribe is not finished when the second subscribe is made. In this case, two network requests are performed. Which I want to avoid.

I tried to create a single thread Scheduler so the the execution of the second call is only carried out when the first call is over but with no luck:

 mSingleThreadScheduler = Schedulers.from(Executors.newSingleThreadExecutor());

and:

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .subscribeOn(mSingleThreadScheduler)
            .first();
}

I've tried to sprinkle the subscribeOn call lower in the Observable chain but I get the same result.

Any hint?

Upvotes: 2

Views: 637

Answers (3)

Benjamin
Benjamin

Reputation: 7368

Given

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .first();
}

You should create an AsyncSubject<Data> mSubject and use it as follow

private Observable<List<Entity>> networkEntities() {
    return mSubject
            .map(Data::getEntities);
}

And your network call should look like this

public Observable<Data> getDataFromNetwork() {
    return networkOperation()
            .subscribeOn(mSingleThreadScheduler)
            .subscribe(mSubject);
}

Upvotes: 0

JohnWowUs
JohnWowUs

Reputation: 3083

This seems to be the relatively common use case of one observable with multiple subscribers. You need something like

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .first()
            .replay(1)            
}

See the answers to this question for a more in depth explanation.

Upvotes: 0

MdFazlaRabbiOpu
MdFazlaRabbiOpu

Reputation: 174

I think it is not a good idea to make a method thread-safe. Because it blocks the whole method thus decrease the performance. So it is recommended to make the data structure thread-safe. In your case your are using List in your method

public Observable<List<Entity>> getEntities() {

}

Use CopyOnWriteArrayList instead of List. It is thread safe.

public Observable<CopyOnWriteArrayList<Entity>> getEntities() {

}

Hope it will work.

Upvotes: 1

Related Questions