Reputation: 609
Given the following input:
Observable<Class1> obs = {{super complex long method}}
List<Class2> inputList = {{simple list}}
I'd like to be able to create the following:
Observable<Class3> output
that emits the result of applying method input.transform(Class1 c)
to each of the inputs in the inputList.
What I've come up with so far is a combination of zip & repeat:
Observable<Class3> output = Observable.zip(obs.repeat(), Observable.from(inputList),
(class1, class2) -> class2.transform(class1));
However, the repeat is way too extreme, it emits multiple repeated items before the zip kicks in.
Another thing I tried was using combineLatest
, but since my List emits first, I ended up with only the last item of the list being combined with the class1 instance.
What other combination of operators might make sense?
Upvotes: 0
Views: 766
Reputation: 68
It sounds like what you want to do is to take each input from obs
, apply a set number of functions defined in a list to each of the items in obs
, then flatten that output back into an Observable of type Class3
. In that case I think flatMap is a good choice, because it clearly signals the intent: that you are applying many functions per item of input, then flattening the stream.
Here's an example (pardon the Java6-ness of it):
Observable<Class3> output = obs.flatMap(new Func1<Class1, Observable<Class3>>() {
@Override
public Observable<Class3> call(final Class1 class1) {
return Observable.from(inputList).map(new Func1<Class2, Class3>() {
@Override
public Class3 call(Class2 class2) {
return class2.transform(class1);
}
});
}
});
Upvotes: 1
Reputation: 20816
You can just change the parameter order, like zip(Observable.from(inputList), obs.repeat(), ...)
.
zip
will subscribe the first Observable
and then the second Observable
. In your example, the first Observable
is infinite (e.g., obs.repeat()
), RxJava will request 128 items at first. That's why you saw obs.repeat()
emitted a lot of items before subscribing the second Observable
.
If changing the parameter order to Observable.from(inputList), Observable.from(inputList)
, RxJava will subscribe Observable.from(inputList)
at first, since it's a synchronous Observable
, RxJava will consume it and know its length at once (assume its length is less than 128), then RxJava will request items from the second obs.repeat()
using this length. So it won't require more than the necessary items.
Upvotes: 1