Reputation: 13402
I have an stream of observables that basically emulates a state diagram. For example:
whenThisHappens()
.switchMap(i -> {
if (i.isThisThing()) {
return whenThatHappens();
} else {
return nothingHappened();
}
}
.subscribe(thing -> {
if (thing.isThatThing()) {
log("Got that thing");
} else {
log("Got nothing");
}
});
The issue is that I want to loop through the login until some event happens (this is in a long running service on Android). Right now I'm able to accomplish this by keeping the observable in a variable, subscribing to it and then unsubscribing and resubscribing to it in it's onComplete
obs = whenThisHappens()
.switchMap(i -> {
if (i.isThisThing()) {
return whenThatHappens();
} else {
return nothingHappened();
}
}
.doOnNext(thing -> {
if (thing.isThatThing()) {
log("Got that thing");
} else {
log("Got nothing");
}
})
.doOnComplete(i -> {
obs.unsubscribe();
obs.subscribe();
}
obs.subscribe();
But I kind of feel like I'm doing something really wrong here. Is there a better way to accomplish this? I looked at retry
but throwing errors just to make it retry seems just as bad as what I'm doing now.
Upvotes: 0
Views: 495
Reputation: 1042
I think what you are trying to do is better accomplished with a PublishSubject
or BehaviorSubject
.
The stream would publish items on the subject which will trigger your subscription.
Here is event stream class I wrote a while ago:
public class SubjectEventStream implements IEventStream {
private final BehaviorSubject<IEvent> stream = BehaviorSubject.create();
@Override
public void publish(Observable<IEvent> event) {
event.doOnNext(stream::onNext).subscribe();
}
@Override
public Observable<IEvent> observe() {
return stream;
}
@Override
public <T> Observable<T> observe(Class<T> eventClass) {
return stream.ofType(eventClass);
}
}
See some more info here:
http://reactivex.io/documentation/subject.html
http://akarnokd.blogspot.com/2015/06/subjects-part-1.html
Upvotes: 1
Reputation: 31339
Reading your code it looks like you want filter
:
whenThisHappens()
# ignore uninteresting things
.filter(i -> i.isThisThing())
# do stuff on interesting things
.subscribe(item -> log("Got: " + item.toString()));
There are two more optional arguemnts to this basic subscribe which are an on-error function and an on-complete function you can utilize if you need - but subscriptions are automagically managed here.
Upvotes: 0