xst
xst

Reputation: 3056

Observable with optional subscribers execute once

How can an Observer have its logic execute once (asynchronously) regardless of subscribers. My use-case is a Provider class that does CRUD for a Model class by delegating to another object (SqliteOpenHelper). A call to ModelProvider.update should persist changes to the model, whether the UI subscribes or not.

class ModelProvider {
    Observable<String> updateModel(Model model) {
        return Observable.create((Subscriber<? super String> subscriber) -> {
            // synchronous for now, probably irrelevant
            String modelId = repository.save(model);
            subscriber.onNext(modelId);
        }).subscribeOn(Schedulers.io());
    }
}

class SomeActivity {
    void updateModel(Model model) {
        // dont care about the result, but need it to execute exactly once
        mModelProvider.updateModel(model);
    }
}

I'm in the middle of moving a codebase to using RxJava and reactive patterns so the call to repository.save(model) is synchronous, but will still need the logic to execute exactly once afterwards, with or without UI subscribers.

I've considered simply moving the repository logic out of the observable and then returning a different observable that uses the result of that:

class ModelProvider {
    Observable<String> updateModel(Model model) {
        String modelId = repository.save(model);

        return Observable.create((Subscriber<? super String> subscriber) -> {
            subscriber.onNext(modelId);
        }).subscribeOn(Schedulers.io());
    }
}

but this forces the IO work on the main thread and blocks it.

another non-solution here:

    Observable<String> updateModel(Model model) {
        Observable<String> obs = Observable.create((Subscriber<? super String> subscriber) -> {
        String modelId = repository.save(model);
            subscriber.onNext(modelId);
        }).subscribeOn(Schedulers.io());

        obs.subscribe();

        return obs;
    }

this does execute once at obs.subscribe() but executes again if the UI subscribes to it.

I'm just now learning reactive patterns but I assume this is where Subjects/Relays are useful.

Upvotes: 3

Views: 1392

Answers (2)

xst
xst

Reputation: 3056

this works:

Observable<String> updateModel(Model model) {
    Observable<String> obs = Observable.create((Subscriber<? super String> subscriber) -> {
        String modelId = repository.save(model);
        subscriber.onNext(modelId);
    }).subscribeOn(Schedulers.io())

    // cached() returns a new observer over the original, which only executes once
    // but not until it is subscribed to
    Observable<String> cachedObs = obs.cache();
    // this "starts" execution of `repository.save()`
    cachedObs.subscribe();

    // return observable that executes once and caches (cachedObs)
    // returning the original (obs) would cause multiple executions
    return cachedObs;
}

calling Observable.cache() after it's created prevents re-invocation, but the call to obs.subscribe() is needed in order to get that one-and-only-one invocation.

Note that the persistence block (repository.save() etc..) doesn't start immediately on subscription, but is queued to execute asynchronously on a separate thread (Schedulers.io thread-pool) at an indeterminate point in the future.


this uses rxjava-async-utils package and also works:

Observable<String> updateModel(Model model) {
    Observable<String> obs = Async.start(() -> repository.save(model));

    // cached() here is required in order to avoid the race condition
    Observable<String> cachedObs = obs.cache();

    return cachedObs;
}

Note that the call to cache() is required in order to save the result and provide it to subscribers. Without it, there is a race condition where subscribers get no result because the Async.start block has already completed.

Upvotes: 2

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16152

As @xst said, but I'd simplify it more:

return Observable
     .defer(() -> Observable.just(repository.save(model)))
     .subscribeOn(Schedulers.io())
     .cache();

Upvotes: 1

Related Questions