Reputation: 3056
How can an Observer
have its logic execute once (asynchronously) regardless of subscribers. My use-case is a Provider
class that does CRUD for a Model
class by delegating to another object (SqliteOpenHelper
). A call to ModelProvider.update
should persist changes to the model, whether the UI subscribes or not.
class ModelProvider {
Observable<String> updateModel(Model model) {
return Observable.create((Subscriber<? super String> subscriber) -> {
// synchronous for now, probably irrelevant
String modelId = repository.save(model);
subscriber.onNext(modelId);
}).subscribeOn(Schedulers.io());
}
}
class SomeActivity {
void updateModel(Model model) {
// dont care about the result, but need it to execute exactly once
mModelProvider.updateModel(model);
}
}
I'm in the middle of moving a codebase to using RxJava and reactive patterns so the call to repository.save(model)
is synchronous, but will still need the logic to execute exactly once afterwards, with or without UI subscribers.
I've considered simply moving the repository logic out of the observable and then returning a different observable that uses the result of that:
class ModelProvider {
Observable<String> updateModel(Model model) {
String modelId = repository.save(model);
return Observable.create((Subscriber<? super String> subscriber) -> {
subscriber.onNext(modelId);
}).subscribeOn(Schedulers.io());
}
}
but this forces the IO work on the main thread and blocks it.
another non-solution here:
Observable<String> updateModel(Model model) {
Observable<String> obs = Observable.create((Subscriber<? super String> subscriber) -> {
String modelId = repository.save(model);
subscriber.onNext(modelId);
}).subscribeOn(Schedulers.io());
obs.subscribe();
return obs;
}
this does execute once at obs.subscribe()
but executes again if the UI subscribes to it.
I'm just now learning reactive patterns but I assume this is where Subjects
/Relays
are useful.
Upvotes: 3
Views: 1392
Reputation: 3056
this works:
Observable<String> updateModel(Model model) {
Observable<String> obs = Observable.create((Subscriber<? super String> subscriber) -> {
String modelId = repository.save(model);
subscriber.onNext(modelId);
}).subscribeOn(Schedulers.io())
// cached() returns a new observer over the original, which only executes once
// but not until it is subscribed to
Observable<String> cachedObs = obs.cache();
// this "starts" execution of `repository.save()`
cachedObs.subscribe();
// return observable that executes once and caches (cachedObs)
// returning the original (obs) would cause multiple executions
return cachedObs;
}
calling Observable.cache()
after it's created prevents re-invocation, but the call to obs.subscribe()
is needed in order to get that one-and-only-one invocation.
Note that the persistence block (repository.save()
etc..) doesn't start immediately on subscription, but is queued to execute asynchronously on a separate thread (Schedulers.io
thread-pool) at an indeterminate point in the future.
this uses rxjava-async-utils
package and also works:
Observable<String> updateModel(Model model) {
Observable<String> obs = Async.start(() -> repository.save(model));
// cached() here is required in order to avoid the race condition
Observable<String> cachedObs = obs.cache();
return cachedObs;
}
Note that the call to cache()
is required in order to save the result and provide it to subscribers. Without it, there is a race condition where subscribers get no result because the Async.start
block has already completed.
Upvotes: 2
Reputation: 16152
As @xst said, but I'd simplify it more:
return Observable
.defer(() -> Observable.just(repository.save(model)))
.subscribeOn(Schedulers.io())
.cache();
Upvotes: 1