bitan
bitan

Reputation: 454

RX-java2 repeat consumption of an Observable

I have 3 Observables, the output of the first observable is required by the second observable. And the output the both first and second observable is required by the third observable.

    Observable<String> observableOne = Observable
        .just("{1}")
        .map(v -> {
            System.out.println("Executing Observable 1");
            return v;
        });

    Observable<String> observableTwo = observableOne
        .map(observableOneValue -> {
            System.out.println("Executing Observable 2");
            return "{2"+observableOneValue+"}";
        });

    Observable.zip(
        observableOne,
        observableTwo,
        (observableOneValue, observableTwoValue) ->
        {
            System.out.println("Executing Observable 3");
            return "{3"+observableOneValue+observableTwoValue+"}";
        }
    ).blockingSubscribe(System.out::println);

This repeats the execution of the first observable, I can certainly make the first observable cacheable. But I was wondering if there is a better option than that, particularly I am looking for some kind of message passing construct from the first to the other two observables

Upvotes: 1

Views: 75

Answers (1)

iagreen
iagreen

Reputation: 32016

I am not sure exactly what you are looking for by a "message passing construct". cache will work for you example above, but you mention you don't want to use that.

Another option that might fit your use case is using ConnectableObservable. It only starts emitting items when you call connect on it, not when it is subscribed to. Convert your observableOne to a ConnectableObservable by calling publish. Then set up all your subscribers. Then call connect on observableOne.

ConnectableObservable<String> observableOne = Observable
        .just("{1}")
        .map(v -> {
          System.out.println("Executing Observable 1");
          return v;
        }).publish();

    Observable<String> observableTwo = observableOne
        .map(observableOneValue -> {
          System.out.println("Executing Observable 2");
          return "{2"+observableOneValue+"}";
        });

    Observable.zip(
        observableOne,
        observableTwo,
        (observableOneValue, observableTwoValue) ->
        {
          System.out.println("Executing Observable 3");
          return "{3"+observableOneValue+observableTwoValue+"}";
        }
    ).subscribe(System.out::println);

    // Call when all the subscribers are ready --
    observableOne.connect();

Notes

  • observableOne is now a ConnectableObservable
  • need to use subscribe instead of blockingSubscribe so the code will execute the connect call.

Upvotes: 1

Related Questions