Reputation: 9241
I'm trying to create an RxJava 2 Subject
that can subscribe to an Observable
, but doesn't terminate when the subscribed Observable
terminates either due to error or completion.
Essentially, this will act as an event bus that can subscribe to other Observable
s.
What's the right way to do this, to avoid propagating the termination and also avoiding leaks?
Upvotes: 1
Views: 1234
Reputation: 138
In RxJava3 I could with this with Observable.flatMap():
@Test
void testReplacingObservable() {
Subject<Observable<Integer>> observableStream = UnicastSubject.create();
Observable<Integer> integerStream = observableStream.flatMap(observable -> observable);
TestObserver<Integer> observer = integerStream.test();
// now can keep adding new observables to the source observable stream
observableStream.onNext(Observable.just(0, 1, 2, 3, 4));
observableStream.onNext(Observable.just(5, 6));
observableStream.onNext(Observable.just(7, 8, 9));
observer.assertValueCount(10);
observer.assertValueAt(0, 0);
observer.assertValueAt(5, 5);
observer.assertValueAt(7, 7);
}
Upvotes: 0
Reputation: 13481
Your best option I think it´s use the operator "relay" which wont unsubscribe once all items has been emitted in the pipeline
/**
* Relay is just an observable which subscribe an observer, but it wont unsubscribe once emit the items. So the pipeline keep open
* It should return 1,2,3,4,5 for first observer and just 3, 4, 5 fot the second observer since default relay emit last emitted item,
* and all the next items passed to the pipeline.
*/
@Test
public void testRelay() throws InterruptedException {
BehaviorRelay<String> relay = BehaviorRelay.create("default");
relay.subscribe(result -> System.out.println("Observer1:" + result));
relay.call("1");
relay.call("2");
relay.call("3");
relay.subscribe(result -> System.out.println("Observer2:" + result));
relay.call("4");
relay.call("5");
}
You can see more examples of relay here https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java
Upvotes: 0
Reputation: 3016
This should solve your problem: https://github.com/JakeWharton/RxRelay/
Upvotes: 1