Reputation: 3559
In RxJs there is a method called Observable.expand which can recursively expand a sequence with a transformation function.
For example,
Rx.Observable.return(0).expand(function (x) { return Rx.Observable.return(x+1); })
will emit all integers
However, I can not find this method in RxJava. Is there any other methods in RxJava can achieve similar goal?
For more detailed specification of exapand(), see https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/expand.md
Upvotes: 3
Views: 1815
Reputation: 2442
Well, the example you gave could be simplified to:
Observable.range(0, Integer.MAX_VALUE)
but I'm assuming you're actually wanting to do something more complicated. scan
is not identical to what you're looking for, but it could do similar things, and we could use it to make a Transformer
that you could reuse similar to expand.
The main difference with scan
is that it needs a new input value at every step, but, like expand, it keeps the previous value. We can get by that just by ignoring the new input value. Because scan is similar to expand, I'm going to start with a scan
example that has some pretty big flaws, and then explore a better option.
public class Expand<T, T> implements Transformer<T, T> {
private final Func1<T, T> expandFunc;
public Expand(final Func1<T, T> expandFunc) {
this.initialValue = initialValue;
this.expandFunc = expandFunc;
}
@Override
public Observable<T> call(Observable<T> source) {
// Here we treat emissions from the source as a new 'initial value'.
// NOTE: This will effectively only allow one input from the source, since the
// output Observable expands infinitely. If you want it to switch to a new expanded
// observable each time the source emits, use switchMap instead of concatMap.
return source.concatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T initialValue) {
// Make an infinite null Observable, just for our next-step signal.
return Observable.<Void>just(null).repeat()
.scan(initialValue, new Func2<T, Void, T>() {
@Override
public T call(final T currentValue, final Void unusedSignal) {
return expandFunc.call(currentValue);
}
});
}
});
}
}
To use that transformer, lets make a method that takes the current number, adds 1, and squares it.
Observable.just(1).compose(new Expand(new Func1<Integer, Integer>() {
@Override
public Integer call(final Integer input) {
final Integer output = input + 1;
return output * output;
}
});
Anyway, you probably noticed some of the major awkward points of that approach. First, there's the switch vs. concatMap thing, and how this essentially turns one item from an Observable output into an infinite chain. Second, the whole 'Void' signal Observable shouldn't be necessary. Sure we could use range
or just(1).repeat()
or numerous other things, but still, they ultimately get thrown away.
Here's a way you could model it more cleanly and recursively.
public static <T> Observable<T> expandObservable(
final T initialValue, final Func1<T, T> expandFunc) {
return Observable.just(initialValue)
.concatWith(Observable.defer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
return expandObservable(expandFunc.call(initialValue), expandFunc);
}
});
}
So, in this example, each recursive pass outputs the current value (expanded on each step, and concats with the next step. defer
is used to keeping infinite recursion from happening right away, since it doesn't call the code to create the Observable until it subscribes. Using this looks like this:
expandObservable(1, new Func1<Integer, Integer>() {
@Override
public Integer call(final Integer input) {
final Integer output = input + 1;
return output * output;
}
}).subscribe(/** do whatever */);
So, much like the compose
example, but a much tidier and cleaner implementation.
Hope that helps.
Upvotes: 4
Reputation: 3951
I think you are looking for map
:
https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#map
numbers = Observable.from([1, 2, 3, 4, 5]);
numbers.map({it * it}).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
4
9
16
25
Sequence complete
Upvotes: -1