lczapski
lczapski

Reputation: 4120

How to create Observable<Integer> from infinite Stream<Integer> using RxJava2?

I am trying to upgrade RxJava from 1 to 2. In my old code I have a method like below:

private Observable<Integer> reversRange(int from, int to) {
    Stream<Integer> intStream = Stream.iterate(to, p -> p - 1);
    return Observable.from(() -> intStream.iterator())
            .takeWhile(n -> n > from)
            .map(n -> n );
}

But now in RxJava 2 I can not use from. What would be equivalent of this code? I have found in this question that it is fromIterable but I do not know how to use it with Stream.

Or other example, this should not be only for range but for any infinite stream.

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
    return Observable.from(() -> intStream.iterator());
}

Upvotes: 1

Views: 3043

Answers (3)

Marinos An
Marinos An

Reputation: 10808

How to create Observable from infinite Stream using RxJava2?

Like this:

    public static <T> Observable<T> streamToObservable(Stream<T> stream) {
        return Observable.fromIterable(stream::iterator);
    }

Usage:



    public static void main(String[] args) {
        //Some infinite stream
        Stream<Integer> stream = Stream.iterate(1, (i)->i+1);

        //create an observable from the stream
        Observable<Integer> obs = streamToObservable(stream);
        
        //subscribe to the observable
        obs.subscribe((v) -> {
            System.out.println("Value from observable:"+v);
        });
        
    }

output:

Value from observable 1
Value from observable 2
Value from observable 3
Value from observable 4
.........

This answer explains how a Stream can be converted to Iterable.


RxJava3

//Some infinite stream
Stream<Integer> stream = Stream.iterate(1, (i)->i+1);

Observable<Integer> obs = Observable.fromStream(stream);

Upvotes: 0

Daniele Segato
Daniele Segato

Reputation: 12899

Use the generate() function:

this is kotlin code (an extension function), but you just need to change the lambda slightly. And this works with any stream.

fun <T> Stream<T>.toFlowable(): Flowable<T> {
  return Flowable.generate(
    Callable { iterator() },
    BiConsumer { ite, emitter ->
      if (ite.hasNext()) {
        emitter.onNext(ite.next())
      } else {
        emitter.onComplete()
      }
    }
  )
}

you can also use Observable if you prefer, but I don't see why you should.

fun <T> Stream<T>.toObservable(): Observable<T> {
  return Observable.generate(
    Callable { iterator() },
    BiConsumer { ite, emitter ->
      if (ite.hasNext()) {
        emitter.onNext(ite.next())
      } else {
        emitter.onComplete()
      }
    }
  )
}

I think in java will be something like:

public <T> Observable<T> streamToObservable(Stream<T> stream) {
  return Observable.generate(
    () -> stream.iterator(),
    (ite, emitter) -> {
      if (ite.hasNext()) {
        emitter.onNext(ite.next());
      } else {
        emitter.onComplete();
      }
    }
  );
}

and so your code would become:

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
    return streamToObservable(intStream);
}

Upvotes: 1

Shankha057
Shankha057

Reputation: 1361

If you have a Stream of Integer only then you can simply do:

Observable.fromIterable(IntStream.rangeClosed(from,to)
            .boxed()
            .collect(Collectors.toList()));  

Arguments for rangedClosed are inclusive.

There is another general method that you can use which is closer to what you have in your attempt:

Observable.fromIterable(Stream.iterate(from, integer -> integer + 1)
                .filter(integer -> integer < (to+1))
                .limit((long(to+1)-long(from))
                .collect(Collectors.toList()));  

EDIT1

If you want an infinite stream. Java Stream's generate and iterate both produce infinite streams. In my example using iterate (you can replace it using generate with a Supplier where you have your custom object creation code) and get rid of all the terminal operators like limit.
Then wrap them into an Observable or into a Flowable and then into an Observable if you want backpressure support with RxJava2.

Like this:

Observable.just(Stream.generate(() -> // Object creation logic here));  

Or

Observable.just(Flowable.just(Stream.generate(() -> // Object creation logic here)));  

Keep in mind that if you do this then your code will keep creating objects indefinitely and your program will run until your memory runs out.
I guess you have some kind of service that is sending you data and you need to make some transformation and send that data as a stream somewhere else. I would recommend getting the data as a Future and then wrapping it into a Flowable and then streaming the data to wherever you are sending it.
Like:

Flowable.fromFuture(senderService.getDataAsCompletableFuture);  

And then specify a backpressure strategy.

EDIT2
You can use Observable.generate() to do it.
Like:

Observable.generate(() -> from, (value, emitter) -> {
        emitter.onNext(value);
        return value + 1; 
    });

Upvotes: 2

Related Questions