Reputation: 10288
I want to fuse the inputs of several Android sensors and expose the output as an observable (or at least something that can be subscribed to) that supports multiple simultaneous observers. What's the idiomatic way to approach this? Is there a class in the standard library that would make a good starting point?
I was thinking of wrapping a PublishSubject
in an object with delegates for one or more subscribe
methods that test hasObservers
to activate the sensors, and wrap the returned Disposable
in a proxy that tests hasObservers
to deactivate them. Something like this, although this already has some obvious problems:
public class SensorSubject<T> {
private final PublishSubject<T> mSubject = PublishSubject.create();
public Disposable subscribe(final Consumer<? super T> consumer) {
final Disposable d = mSubject.subscribe(consumer);
if(mSubject.hasObservers()) {
// activate sensors
}
return new Disposable() {
@Override
public void dispose() {
// possible race conditions!
if(!isDisposed()) {
d.dispose();
if(!mSubject.hasObservers()) {
// deactivate sensors
}
}
}
@Override
public boolean isDisposed() {
return d.isDisposed();
}
};
}
}
Upvotes: 0
Views: 262
Reputation: 3494
The idiomatic way to do that in RxJava would be to use hot observable.
Cold observables do some action when someone subscribes to them and emit all items to that subscriber. So it's 1 to 1 relation.
Hot observable do some action and emits items independently on individual subscription. So if you subscribe too late, you might not get some values that were emitted earlier. This is 1 to many relation, aka multicast - which is what you want.
Usual way to do it is Flowable.publish()
which makes Flowable
multicast, but requires calling connect()
method to start emitting values.
In your case you can also call refCount()
which adds your desired functionality - it subscribes to source Flowable when there is at least one subscription and unsubscribes when everyone unsubsribed.
Because publish().refCount()
is pretty popular combination, there is a shortcut for them - share()
. And as far as I understand this is exactly what you want.
Edit by asker: This code incorporates this answer and David Karnok's comment in the form of a Dagger 2 provider method. SimpleMatrix
is from EJML. This seems to be doing what I asked for.
@Provides
@Singleton
@Named(MAGNETOMETER)
public Observable<SimpleMatrix> magnetometer(final SensorManager sensorManager) {
final PublishSubject<SimpleMatrix> ps = PublishSubject.create();
final Sensor sensor = sensorManager.getDefaultSensor(TYPE_MAGNETIC_FIELD);
final SensorEventListener listener = new SensorEventAdapter() {
@Override
public void onSensorChanged(final SensorEvent event) {
ps.onNext(new SimpleMatrix(1, 3, true, event.values));
}
};
return ps.doOnSubscribe(s -> {
sensorManager.registerListener(listener, sensor, SENSOR_DELAY_NORMAL);
}).doOnDispose(() -> {
sensorManager.unregisterListener(listener);
}).share();
}
Upvotes: 1