Can RxJava's scan operator keep state across multiple subscribers

I have an observable, which is transformed with scan operator, so it always emits a value based on the current and the previous ones (or initial value).

Subject<String> subject = PublishSubject.create();
Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);

Then, first observer subscribes, prints all emitted values, and after some time unsubscribes.

Disposable first = observable.subscribe(System.out::println); // "zero"
subject.onNext("one"); // "zero, one"
first.dispose();

Later, another one subscriber subscribes:

Disposable second = observable.subscribe(System.out::println); // "zero"
subject.onNext("two"); // "zero, one"
second.dispose();

As you can see, each observer is first fed with the initial value and values emitted when the previous one was subscribed are gone. What I'd like to achieve is keeping the state inside scan operator across all subscriptions:

Disposable first = observable.subscribe(System.out::println); // "zero"
subject.onNext("one"); // "zero, one"
first.dispose();
Disposable second = observable.subscribe(System.out::println); // "zero, one"
subject.onNext("two"); // "zero, one, two"
second.dispose();

Is there a solution to this problem in RxJava?

Upvotes: 1

Views: 1052

Answers (3)

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4012

Another solution:

observable#publish will multicast the result to all subscriber and therefore persist the last #scan value. Your solution was not working, because you create a new Observable every time you subscribed to it. The scan value was not persisted.

@Test
void name3231() {
    Subject<String> subject = PublishSubject.create();
    Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);

    ConnectableObservable<String> observableMulticast = observable.publish();

    Disposable connect = observableMulticast.connect(); // must be disposed by hand

    TestObserver<String> test = observableMulticast.test();

    Disposable first = observableMulticast.subscribe(System.out::println); // "zero"
    subject.onNext("one"); // "zero, one"
    first.dispose();

    test.assertValues("zero, one");

    Disposable second = observableMulticast.subscribe(System.out::println); // "zero, one"
    subject.onNext("two"); // "zero, one, two"
    second.dispose();

    test.assertValues("zero, one", "zero, one, two");
}

Upvotes: 1

akarnokd
akarnokd

Reputation: 70007

You could scan into a BehaviorSubject and the others should subscribe to that:

Subject<String> subject = PublishSubject.create();
Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);
Subject<String> multicast = BehaviorSubject.create();

observable.subscribe(multicast);

Disposable first = multicast.subscribe(System.out::println); // "zero"
subject.onNext("one"); // "zero, one"
first.dispose();
Disposable second = multicast.subscribe(System.out::println); // "zero, one"
subject.onNext("two"); // "zero, one, two"
second.dispose();

Upvotes: 1

I have found a solution by introducing BehaviorSubject as a "caching" layer. Not sure if it is the cleanest way to solve the problem, but it works.

The line:

Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);

has to be transformed to:

Subject<String> observable = BehaviorSubject.create();
subject.scan("zero", (a, b) -> a + ", " + b)
       .subscribe(observable);

Upvotes: 1

Related Questions