Kevin Krumwiede
Kevin Krumwiede

Reputation: 10288

How to create a multicast observable that activates on subscribe?

I want to fuse the inputs of several Android sensors and expose the output as an observable (or at least something that can be subscribed to) that supports multiple simultaneous observers. What's the idiomatic way to approach this? Is there a class in the standard library that would make a good starting point?

I was thinking of wrapping a PublishSubject in an object with delegates for one or more subscribe methods that test hasObservers to activate the sensors, and wrap the returned Disposable in a proxy that tests hasObservers to deactivate them. Something like this, although this already has some obvious problems:

public class SensorSubject<T> {
    private final PublishSubject<T> mSubject = PublishSubject.create();

    public Disposable subscribe(final Consumer<? super T> consumer) {
        final Disposable d = mSubject.subscribe(consumer);
        if(mSubject.hasObservers()) {
            // activate sensors
        }
        return new Disposable() {
            @Override
            public void dispose() {
                // possible race conditions!
                if(!isDisposed()) {
                    d.dispose();
                    if(!mSubject.hasObservers()) {
                        // deactivate sensors
                    }
                }
            }

            @Override
            public boolean isDisposed() {
                return d.isDisposed();
            }
        };
    }
}

Upvotes: 0

Views: 262

Answers (1)

michalbrz
michalbrz

Reputation: 3494

The idiomatic way to do that in RxJava would be to use hot observable.

Cold observables do some action when someone subscribes to them and emit all items to that subscriber. So it's 1 to 1 relation.

Hot observable do some action and emits items independently on individual subscription. So if you subscribe too late, you might not get some values that were emitted earlier. This is 1 to many relation, aka multicast - which is what you want.

Usual way to do it is Flowable.publish() which makes Flowable multicast, but requires calling connect() method to start emitting values.

In your case you can also call refCount() which adds your desired functionality - it subscribes to source Flowable when there is at least one subscription and unsubscribes when everyone unsubsribed.

Because publish().refCount() is pretty popular combination, there is a shortcut for them - share(). And as far as I understand this is exactly what you want.

Edit by asker: This code incorporates this answer and David Karnok's comment in the form of a Dagger 2 provider method. SimpleMatrix is from EJML. This seems to be doing what I asked for.

@Provides
@Singleton
@Named(MAGNETOMETER)
public Observable<SimpleMatrix> magnetometer(final SensorManager sensorManager) {
    final PublishSubject<SimpleMatrix> ps = PublishSubject.create();
    final Sensor sensor = sensorManager.getDefaultSensor(TYPE_MAGNETIC_FIELD);
    final SensorEventListener listener = new SensorEventAdapter() {
        @Override
        public void onSensorChanged(final SensorEvent event) {
            ps.onNext(new SimpleMatrix(1, 3, true, event.values));
        }
    };
    return ps.doOnSubscribe(s -> {
        sensorManager.registerListener(listener, sensor, SENSOR_DELAY_NORMAL);
    }).doOnDispose(() -> {
        sensorManager.unregisterListener(listener);
    }).share();
}

Upvotes: 1

Related Questions