Reputation: 5168
I am using the reactive-location lib.
My use case is that I have a stream of objects emitted from an observable. These items can be emitted potentially every few hours. As soon as an item is emitted I want to obtain a location and using zipWith(as far as I can understand) emit an object containing the location.
The problem is: as the objects will only be emitted once every few hours, I cannot keep the location observable hot as it would drain the battery.
So I need the following: once an object is passed into the stream, subscribe to the location observable once a location is obtained, unsubscribe to the location observable. This must be done continuously.
As far as I understand this transformer takes care of unsubscribing
public <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
final BehaviorSubject subject = BehaviorSubject.create();
Observable source = tObservable.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
subject.onNext(t);
}
});
return Observable
.merge(source.takeUntil(subject), subject)
.take(1);
}
};
}
But how would I subscribe again once a new object is sent down the stream?
Upvotes: 0
Views: 385
Reputation: 5823
Sounds like what you need is to combine the source items with the current location when they are emitted. No need for anything fancy here. Just use flatMap()
on each of the source items to combine it with the location.
source.flatMap(item ->
locationProvider
.getLastKnownLocation()
.map(location -> new ItemWithLocation<>(item, location))
);
class ItemWithLocation<T> {
private final T item;
private final Location location;
public ItemWithLocation(T item, Location location) {
this.item = item;
this.location = location;
}
public T getItem() {
return item;
}
public Location getLocation() {
return location;
}
}
EDIT: Updated with second example. The following will subscribe for location updates until a specific accuracy is reached and then combine it with your source item. The key here is the use of first()
. Using it will unsubscribe from the location provider whenever you get a location that satisfies your needs.
LocationRequest request =
LocationRequest
.create()
.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
.setInterval(100);
source.flatMap(item ->
locationProvider
.getUpdatedLocation(request)
.first(location -> location.getAccuracy() < 5.0f)
.map(location -> new ItemWithLocation<>(item, location))
);
Upvotes: 1