Saad Farooq
Saad Farooq

Reputation: 13402

Looping through observable stream

I have an stream of observables that basically emulates a state diagram. For example:

whenThisHappens()
    .switchMap(i -> {
        if (i.isThisThing()) {
            return whenThatHappens();
        } else {
            return nothingHappened();
        }
    }
    .subscribe(thing -> {
        if (thing.isThatThing()) {
            log("Got that thing");
        } else {
            log("Got nothing");
        }
    });

The issue is that I want to loop through the login until some event happens (this is in a long running service on Android). Right now I'm able to accomplish this by keeping the observable in a variable, subscribing to it and then unsubscribing and resubscribing to it in it's onComplete

obs = whenThisHappens()
    .switchMap(i -> {
        if (i.isThisThing()) {
            return whenThatHappens();
        } else {
            return nothingHappened();
        }
    }
    .doOnNext(thing -> {
        if (thing.isThatThing()) {
            log("Got that thing");
        } else {
            log("Got nothing");
        }
    })
    .doOnComplete(i -> {
        obs.unsubscribe();
        obs.subscribe();
    }
    obs.subscribe();

But I kind of feel like I'm doing something really wrong here. Is there a better way to accomplish this? I looked at retry but throwing errors just to make it retry seems just as bad as what I'm doing now.

Upvotes: 0

Views: 495

Answers (2)

John Scattergood
John Scattergood

Reputation: 1042

I think what you are trying to do is better accomplished with a PublishSubject or BehaviorSubject.

The stream would publish items on the subject which will trigger your subscription.

Here is event stream class I wrote a while ago:

public class SubjectEventStream implements IEventStream {
    private final BehaviorSubject<IEvent> stream = BehaviorSubject.create();

    @Override
    public void publish(Observable<IEvent> event) {
        event.doOnNext(stream::onNext).subscribe();
    }

    @Override
    public Observable<IEvent> observe() {
        return stream;
    }

    @Override
    public <T> Observable<T> observe(Class<T> eventClass) {
        return stream.ofType(eventClass);
    }
}

See some more info here:

http://reactivex.io/documentation/subject.html

http://akarnokd.blogspot.com/2015/06/subjects-part-1.html

Upvotes: 1

Reut Sharabani
Reut Sharabani

Reputation: 31339

Reading your code it looks like you want filter:

whenThisHappens()
         # ignore uninteresting things
        .filter(i -> i.isThisThing())
         # do stuff on interesting things
        .subscribe(item -> log("Got: " + item.toString()));

There are two more optional arguemnts to this basic subscribe which are an on-error function and an on-complete function you can utilize if you need - but subscriptions are automagically managed here.

Upvotes: 0

Related Questions