Reputation: 994
Say, I want to make an Observable
in RxJava, which has a feedback coupling like on the image below.
I've managed to achieve that with the use of subjects, like this:
// Observable<Integer> source = Observable.range(0, 6);
public Observable<Integer> getFeedbackSum(Observable<Integer> source) {
UnicastSubject<Integer> feedback = UnicastSubject.create();
Observable<Integer> feedbackSum = Observable.zip(source, feedback.startWith(0), Pair::create)
.map(pair -> pair.first + pair.second);
feedbackSum.subscribe(feedback);
return feedbackSum;
}
It looks rather ugly. Is there a better way?
Upvotes: 2
Views: 392
Reputation: 70007
There is an operator for this: scan
:
public final <R> Observable<R> scan(R initialValue, BiFunction<R,? super T,R> accumulator)
Observable.range(0, 6)
.scan(0, (a, b) -> a + b)
.test()
.assertResut(0, 1, 3, 6, 10, 15, 21);
In case your accumulator type is not immutable, you can use scanWith
:
public final <R> Observable<R> scanWith(Callable<R> seedSupplier, BiFunction<R,? super T,R> accumulator)
Upvotes: 2