Reputation: 20934
Playing with RxJava now and stumbled upon the following problem:
I have 2 different streams:
So essentially I have stream of items and I want all those items to be combined with that single item from 2nd stream:
----a1----a2----a3----a4----a5----|--------------->
-------------b1--|----------------------------------->
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
------------a1b1-a2b1-a3b1-a4b1-a5b1-------->
It looks really similar to combileLatest
operator, but combineLatest will ignore all items from the first stream except the closest to the item from the second stream. It means that I will not receive a1b1
- the first resulting item emitted is gonna be a2b1
.
I also looked at delay
operator, but it doesn't allow me to specify close
stream like it is done with buffer
operatior
Is there any fancy operator which solves the problem above?
Upvotes: 4
Views: 2060
Reputation: 30864
AFAIK, there is no a built-in operator to achieve the behavior you've described. You can always implement a custom operator or build it on top of existing operators. I think the second option is easier to implement and here is the code:
public static <L, R, T> Observable<T> zipper(final Observable<? extends L> left, final Observable<? extends R> right, final Func2<? super L, ? super R, ? extends T> function) {
return Observable.defer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
final SerialSubscription subscription = new SerialSubscription();
final ConnectableObservable<? extends R> cached = right.replay();
return left.flatMap(new Func1<L, Observable<T>>() {
@Override
public Observable<T> call(final L valueLeft) {
return cached.map(new Func1<R, T>() {
@Override
public T call(final R valueRight) {
return function.call(valueLeft, valueRight);
}
});
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
subscription.set(cached.connect());
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
}
});
}
});
}
If you have any questions regarding the code, I can explain it in details.
UPDATE
Regarding the questing how my solution is different from the following one:
left.flatMap(valueLeft -> right.map(valueRight -> together(valueLeft, valueRight)));
left
and right
observables are executing in parallel. right
observable doesn't have to wait for a left
one to emit its first item.Caching - my solution subscribes only once to the right
observables and caches its result. Thats why b1
will always be the same for all aXXX
items. The solution provided by akarnokd subscribes to the right
observable every time the left
one emits an item. That means:
There is no guarantee that b1
won't change its value. For example for the following observable you will get a different b
for each a
.
final Observable<Double> right = Observable.defer(new Func0<Observable<Double>>() {
@Override
public Observable<Double> call() {
return Observable.just(Math.random());
}
});
If the right
observable is a time consuming operation (e.g. network call), you will have to wait for its completion every time the left
observable emits a new item.
Upvotes: 1
Reputation: 69997
There are several ways of making this happen:
1) flatMap
over b
if you don't need to start a
upfront
b.flatMap(bv -> a.map(av -> together(av, bv)));
2) You can, of course, cache
but it will retain your a
s for the entire duration of the stream.
3) Use groupBy a bit unconventionally because its GroupedObservable caches values until the single subscriber arrives, replays the cached value and continues as a regular direct observable (letting all previous cached values go).
Observable<Long> source = Observable.timer(1000, 1000, TimeUnit.MILLISECONDS)
.doOnNext(v -> System.out.println("Tick"))
.take(10);
Observable<String> other = Observable.just("-b").delay(5000, TimeUnit.MILLISECONDS)
.doOnNext(v -> System.out.println("Tack"))
;
source.groupBy(v -> 1)
.flatMap(g ->
other.flatMap(b -> g.map(a -> a + b))
).toBlocking().forEach(System.out::println);
It works as follows:
GroupedObservable
by grouping everything from source into group 1.g
arrives, we 'start observing' the other observable.a + b
s.I've added doOnNexts so you can see the source is really active before the other
fires its "Tack".
Upvotes: 2