Reputation: 2354
I'm using RxJava and the concat()
and first()
operators:
public Observable<List<Entity>> getEntities() {
invalidateCacheIfNeeded();
return Observable
.concat(cachedEntities(), networkEntities())
.first();
}
The cachedEntities
returns an Observable built from a cached list while the networkEntities
method fetches the entities with Retrofit.
This works great unless two user subscribes quickly to the observables returned by getEntities()
. I guess the network request of the first subscribe is not finished when the second subscribe is made. In this case, two network requests are performed. Which I want to avoid.
I tried to create a single thread Scheduler so the the execution of the second call is only carried out when the first call is over but with no luck:
mSingleThreadScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
and:
public Observable<List<Entity>> getEntities() {
invalidateCacheIfNeeded();
return Observable
.concat(cachedEntities(), networkEntities())
.subscribeOn(mSingleThreadScheduler)
.first();
}
I've tried to sprinkle the subscribeOn
call lower in the Observable chain but I get the same result.
Any hint?
Upvotes: 2
Views: 637
Reputation: 7368
Given
public Observable<List<Entity>> getEntities() {
invalidateCacheIfNeeded();
return Observable
.concat(cachedEntities(), networkEntities())
.first();
}
You should create an AsyncSubject<Data> mSubject
and use it as follow
private Observable<List<Entity>> networkEntities() {
return mSubject
.map(Data::getEntities);
}
And your network call should look like this
public Observable<Data> getDataFromNetwork() {
return networkOperation()
.subscribeOn(mSingleThreadScheduler)
.subscribe(mSubject);
}
Upvotes: 0
Reputation: 3083
This seems to be the relatively common use case of one observable with multiple subscribers. You need something like
public Observable<List<Entity>> getEntities() {
invalidateCacheIfNeeded();
return Observable
.concat(cachedEntities(), networkEntities())
.first()
.replay(1)
}
See the answers to this question for a more in depth explanation.
Upvotes: 0
Reputation: 174
I think it is not a good idea to make a method thread-safe. Because it blocks the whole method thus decrease the performance. So it is recommended to make the data structure thread-safe. In your case your are using List in your method
public Observable<List<Entity>> getEntities() {
}
Use CopyOnWriteArrayList instead of List. It is thread safe.
public Observable<CopyOnWriteArrayList<Entity>> getEntities() {
}
Hope it will work.
Upvotes: 1