Reputation: 21
Consider the following:
import io.reactivex.Observable;
import io.reactivex.observables.GroupedObservable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class TypeTest {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Observable<Long> source = Observable.interval(1,TimeUnit.MILLISECONDS).take(20);
Observable<GroupedObservable<String,Long>> sgb =
source.groupBy(x -> (x%2==0)?"Even":"Odd");
// I'd like to introduce a variable for source.reduce but I get a type error.
// Observable<Long> sreduce = source.reduce(new Long(0),(x,y) -> (x+y));
source.reduce(new Long(0),(x,y)->(x+y)).subscribe(x -> {
System.out.println(x);
latch.countDown();
});
latch.await();
}
}
I can subscribe to source.reduce as if it were an Observable but I can't assign that as its type. What type should I assign it?
Upvotes: 0
Views: 1278
Reputation: 16142
You are using imports from package io.reactivex
, which means you are using RxJava 2. In RxJava 2, methods that return reactive streams that have exactly one item (like the result of reduce
), return Single<T>
instead of Observable<T>
.
Most of the methods/operators are the same, and there's even a .toObservable()
method to convert from the specific to the generic.
Side point 1: your grouped
operator evaluates the keys, but does not do anything with the results; as there's not subscription on that Observable, no timer will be started.
Side point 2: you don't need the CountdownLatch
to wait for completion. You can do one of
source.reduce(new Long(0), (x, y) -> (x + y))
.doOnSuccess(System.out::println)
.toFuture()
.get();
or
source.reduce(new Long(0), (x, y) -> (x + y))
.doOnSuccess(System.out::println)
.blockingGet();
Upvotes: 3
Reputation: 3760
If you check the signature of the reduce()
method you'll see:
public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
In your case it'll be an Observable<Long>
:
Observable<Long> source = Observable.interval(1, TimeUnit.MILLISECONDS).take(20);
Observable<Long> sreduce = source.reduce(new Long(0), (x, y) -> (x + y));
As the code stands in your example, you're doing source.reduce(....).subscribe(...)
, which already returns a Subscription
back.
Upvotes: 0