Dave Posner
Dave Posner

Reputation: 21

In RxJava what is the type of a reduce expression

Consider the following:

import io.reactivex.Observable;
import io.reactivex.observables.GroupedObservable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class TypeTest {
    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        Observable<Long> source = Observable.interval(1,TimeUnit.MILLISECONDS).take(20);
        Observable<GroupedObservable<String,Long>> sgb = 
                         source.groupBy(x -> (x%2==0)?"Even":"Odd");

        // I'd like to introduce a variable for source.reduce but I get a type error.
        // Observable<Long> sreduce = source.reduce(new Long(0),(x,y) -> (x+y));

        source.reduce(new Long(0),(x,y)->(x+y)).subscribe(x -> {
            System.out.println(x);
            latch.countDown();
        });
        latch.await();
    }
}

I can subscribe to source.reduce as if it were an Observable but I can't assign that as its type. What type should I assign it?

Upvotes: 0

Views: 1278

Answers (2)

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16142

You are using imports from package io.reactivex, which means you are using RxJava 2. In RxJava 2, methods that return reactive streams that have exactly one item (like the result of reduce), return Single<T> instead of Observable<T>.

Most of the methods/operators are the same, and there's even a .toObservable() method to convert from the specific to the generic.

Side point 1: your grouped operator evaluates the keys, but does not do anything with the results; as there's not subscription on that Observable, no timer will be started.

Side point 2: you don't need the CountdownLatch to wait for completion. You can do one of

    source.reduce(new Long(0), (x, y) -> (x + y))
          .doOnSuccess(System.out::println)
          .toFuture()
          .get();

or

    source.reduce(new Long(0), (x, y) -> (x + y))
          .doOnSuccess(System.out::println)
          .blockingGet();

Upvotes: 3

Vesko
Vesko

Reputation: 3760

If you check the signature of the reduce() method you'll see:

public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {

In your case it'll be an Observable<Long>:

Observable<Long> source = Observable.interval(1, TimeUnit.MILLISECONDS).take(20);
Observable<Long> sreduce = source.reduce(new Long(0), (x, y) -> (x + y));

As the code stands in your example, you're doing source.reduce(....).subscribe(...), which already returns a Subscription back.

Upvotes: 0

Related Questions