4emodan
4emodan

Reputation: 994

RxJava: create a self-dependent stream

Say, I want to make an Observable in RxJava, which has a feedback coupling like on the image below.

Self-dependent stream

I've managed to achieve that with the use of subjects, like this:

// Observable<Integer> source = Observable.range(0, 6);

public Observable<Integer> getFeedbackSum(Observable<Integer> source) {
    UnicastSubject<Integer> feedback = UnicastSubject.create();
    Observable<Integer> feedbackSum = Observable.zip(source, feedback.startWith(0), Pair::create)
        .map(pair -> pair.first + pair.second);

    feedbackSum.subscribe(feedback);
    return feedbackSum;
}

It looks rather ugly. Is there a better way?

Upvotes: 2

Views: 392

Answers (1)

akarnokd
akarnokd

Reputation: 70007

There is an operator for this: scan:

public final <R> Observable<R> scan(R initialValue, BiFunction<R,? super T,R> accumulator)

Observable.range(0, 6)
    .scan(0, (a, b) -> a + b)
    .test()
    .assertResut(0, 1, 3, 6, 10, 15, 21);

In case your accumulator type is not immutable, you can use scanWith:

public final <R> Observable<R> scanWith(Callable<R> seedSupplier, BiFunction<R,? super T,R> accumulator)

Upvotes: 2

Related Questions