Pavel Dudka
Pavel Dudka

Reputation: 20934

Delay items emission until item is emitted from another observable

Playing with RxJava now and stumbled upon the following problem:

I have 2 different streams:

So essentially I have stream of items and I want all those items to be combined with that single item from 2nd stream:

----a1----a2----a3----a4----a5----|--------------->

-------------b1--|----------------------------------->

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

------------a1b1-a2b1-a3b1-a4b1-a5b1-------->

It looks really similar to combileLatest operator, but combineLatest will ignore all items from the first stream except the closest to the item from the second stream. It means that I will not receive a1b1 - the first resulting item emitted is gonna be a2b1.

I also looked at delay operator, but it doesn't allow me to specify close stream like it is done with buffer operatior

Is there any fancy operator which solves the problem above?

Upvotes: 4

Views: 2060

Answers (2)

Vladimir Mironov
Vladimir Mironov

Reputation: 30864

AFAIK, there is no a built-in operator to achieve the behavior you've described. You can always implement a custom operator or build it on top of existing operators. I think the second option is easier to implement and here is the code:

public static <L, R, T> Observable<T> zipper(final Observable<? extends L> left, final Observable<? extends R> right, final Func2<? super L, ? super R, ? extends T> function) {
    return Observable.defer(new Func0<Observable<T>>() {
        @Override
        public Observable<T> call() {
            final SerialSubscription subscription = new SerialSubscription();
            final ConnectableObservable<? extends R> cached = right.replay();

            return left.flatMap(new Func1<L, Observable<T>>() {
                @Override
                public Observable<T> call(final L valueLeft) {
                    return cached.map(new Func1<R, T>() {
                        @Override
                        public T call(final R valueRight) {
                            return function.call(valueLeft, valueRight);
                        }
                    });
                }
            }).doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    subscription.set(cached.connect());
                }
            }).doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    subscription.unsubscribe();
                }
            });
        }
    });
}

If you have any questions regarding the code, I can explain it in details.

UPDATE

Regarding the questing how my solution is different from the following one:

left.flatMap(valueLeft -> right.map(valueRight -> together(valueLeft, valueRight)));
  1. Parallel execution - in my implementation both left and right observables are executing in parallel. right observable doesn't have to wait for a left one to emit its first item.
  2. Caching - my solution subscribes only once to the right observables and caches its result. Thats why b1 will always be the same for all aXXX items. The solution provided by akarnokd subscribes to the rightobservable every time the left one emits an item. That means:

    • There is no guarantee that b1 won't change its value. For example for the following observable you will get a different b for each a.

      final Observable<Double> right = Observable.defer(new Func0<Observable<Double>>() {
          @Override
          public Observable<Double> call() {
              return Observable.just(Math.random());
          }
      });
      
    • If the right observable is a time consuming operation (e.g. network call), you will have to wait for its completion every time the left observable emits a new item.

Upvotes: 1

akarnokd
akarnokd

Reputation: 69997

There are several ways of making this happen:

1) flatMap over b if you don't need to start a upfront

b.flatMap(bv -> a.map(av -> together(av, bv)));

2) You can, of course, cache but it will retain your as for the entire duration of the stream.

3) Use groupBy a bit unconventionally because its GroupedObservable caches values until the single subscriber arrives, replays the cached value and continues as a regular direct observable (letting all previous cached values go).

Observable<Long> source = Observable.timer(1000, 1000, TimeUnit.MILLISECONDS)
        .doOnNext(v -> System.out.println("Tick"))
        .take(10);
Observable<String> other = Observable.just("-b").delay(5000, TimeUnit.MILLISECONDS)
        .doOnNext(v -> System.out.println("Tack"))
        ;

source.groupBy(v -> 1)
.flatMap(g -> 
    other.flatMap(b -> g.map(a -> a + b))
).toBlocking().forEach(System.out::println);

It works as follows:

  • Get a hold onto a GroupedObservable by grouping everything from source into group 1.
  • when the group g arrives, we 'start observing' the other observable.
  • Once other fires its element, we take it and map it over the group and 'start observing' it as well, bringing us the final sequence of a + bs.

I've added doOnNexts so you can see the source is really active before the other fires its "Tack".

Upvotes: 2

Related Questions