jeremy303
jeremy303

Reputation: 9241

Creating an RxJava Subject that doesn't terminate on complete or error

I'm trying to create an RxJava 2 Subject that can subscribe to an Observable, but doesn't terminate when the subscribed Observable terminates either due to error or completion.

Essentially, this will act as an event bus that can subscribe to other Observables.

What's the right way to do this, to avoid propagating the termination and also avoiding leaks?

Upvotes: 1

Views: 1234

Answers (3)

Paul Galbraith
Paul Galbraith

Reputation: 138

In RxJava3 I could with this with Observable.flatMap():

    @Test
    void testReplacingObservable() {
        Subject<Observable<Integer>> observableStream = UnicastSubject.create();
        Observable<Integer> integerStream = observableStream.flatMap(observable -> observable);
        TestObserver<Integer> observer = integerStream.test();

        // now can keep adding new observables to the source observable stream
        observableStream.onNext(Observable.just(0, 1, 2, 3, 4));
        observableStream.onNext(Observable.just(5, 6));
        observableStream.onNext(Observable.just(7, 8, 9));

        observer.assertValueCount(10);
        observer.assertValueAt(0, 0);
        observer.assertValueAt(5, 5);
        observer.assertValueAt(7, 7);
    }

Upvotes: 0

paul
paul

Reputation: 13481

Your best option I think it´s use the operator "relay" which wont unsubscribe once all items has been emitted in the pipeline

   /**
     * Relay is just an observable which subscribe an observer, but it wont unsubscribe once emit the items. So the pipeline keep open
     * It should return 1,2,3,4,5 for first observer and just 3, 4, 5 fot the second observer since default relay emit last emitted item,
     * and all the next items passed to the pipeline.
     */
    @Test
    public void testRelay() throws InterruptedException {
        BehaviorRelay<String> relay = BehaviorRelay.create("default");
        relay.subscribe(result -> System.out.println("Observer1:" + result));
        relay.call("1");
        relay.call("2");
        relay.call("3");
        relay.subscribe(result -> System.out.println("Observer2:" + result));
        relay.call("4");
        relay.call("5");
    }

You can see more examples of relay here https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java

Upvotes: 0

Uli
Uli

Reputation: 3016

This should solve your problem: https://github.com/JakeWharton/RxRelay/

Upvotes: 1

Related Questions