victorx
victorx

Reputation: 3559

What's the RxJava equivalent of the Observable.expand() in RxJs?

In RxJs there is a method called Observable.expand which can recursively expand a sequence with a transformation function.

For example,

Rx.Observable.return(0).expand(function (x) { return Rx.Observable.return(x+1); }) 

will emit all integers

However, I can not find this method in RxJava. Is there any other methods in RxJava can achieve similar goal?

For more detailed specification of exapand(), see https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/expand.md

Upvotes: 3

Views: 1815

Answers (2)

lopar
lopar

Reputation: 2442

Well, the example you gave could be simplified to:

Observable.range(0, Integer.MAX_VALUE)

but I'm assuming you're actually wanting to do something more complicated. scan is not identical to what you're looking for, but it could do similar things, and we could use it to make a Transformer that you could reuse similar to expand.

The main difference with scan is that it needs a new input value at every step, but, like expand, it keeps the previous value. We can get by that just by ignoring the new input value. Because scan is similar to expand, I'm going to start with a scan example that has some pretty big flaws, and then explore a better option.

public class Expand<T, T> implements Transformer<T, T> {
  private final Func1<T, T> expandFunc;

  public Expand(final Func1<T, T> expandFunc) {
    this.initialValue = initialValue;
    this.expandFunc = expandFunc;
  }

  @Override
  public Observable<T> call(Observable<T> source) {
    // Here we treat emissions from the source as a new 'initial value'.

    // NOTE: This will effectively only allow one input from the source, since the
    // output Observable expands infinitely. If you want it to switch to a new expanded
    // observable each time the source emits, use switchMap instead of concatMap.
    return source.concatMap(new Func1<T, Observable<T>>() {

      @Override
      public Observable<T> call(T initialValue) {
        // Make an infinite null Observable, just for our next-step signal.
        return Observable.<Void>just(null).repeat()
            .scan(initialValue, new Func2<T, Void, T>() {

              @Override
              public T call(final T currentValue, final Void unusedSignal) {
                return expandFunc.call(currentValue);
              }

            });
      }

    });
  }
}

To use that transformer, lets make a method that takes the current number, adds 1, and squares it.

Observable.just(1).compose(new Expand(new Func1<Integer, Integer>() {

  @Override
  public Integer call(final Integer input) {
    final Integer output = input + 1;
    return output * output;
  }

});

Anyway, you probably noticed some of the major awkward points of that approach. First, there's the switch vs. concatMap thing, and how this essentially turns one item from an Observable output into an infinite chain. Second, the whole 'Void' signal Observable shouldn't be necessary. Sure we could use range or just(1).repeat() or numerous other things, but still, they ultimately get thrown away.

Here's a way you could model it more cleanly and recursively.

public static <T> Observable<T> expandObservable(
    final T initialValue, final Func1<T, T> expandFunc) {
  return Observable.just(initialValue)
      .concatWith(Observable.defer(new Func0<Observable<T>>() {

        @Override
        public Observable<T> call() {
          return expandObservable(expandFunc.call(initialValue), expandFunc);
        }

      });
}

So, in this example, each recursive pass outputs the current value (expanded on each step, and concats with the next step. defer is used to keeping infinite recursion from happening right away, since it doesn't call the code to create the Observable until it subscribes. Using this looks like this:

expandObservable(1, new Func1<Integer, Integer>() {

  @Override
  public Integer call(final Integer input) {
    final Integer output = input + 1;
    return output * output;
  }

}).subscribe(/** do whatever */);

So, much like the compose example, but a much tidier and cleaner implementation.

Hope that helps.

Upvotes: 4

edbond
edbond

Reputation: 3951

I think you are looking for map:

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#map

numbers = Observable.from([1, 2, 3, 4, 5]);

numbers.map({it * it}).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);

1
4
9
16
25
Sequence complete

Upvotes: -1

Related Questions