Reputation: 9103
I'm using rxjs
.
I have a Browser
that's responsible for a number of Page
objects. Each page has an Observable<Event>
that yields a stream of events.
Page
objects are closed and opened at various times. I want to create one observable, called TheOneObservable
that will merge all the events from all the currently active Page
objects, and also merge in custom events from the Browser
object itself.
Closing a Page
means that the subscription to it should be closed so it doesn't prevent it from being GC'd.
My problem is that Pages
can be closed at any time, which means that the number of Observables being merged is always changing. I've thought of using an Observable of Pages
and using mergeMap
, but there are problems with this. For example, a subscriber will only receive events of Pages that are opened after it subscribes.
Note that this question has been answered here for .NET
, but using an ObservableCollection
that isn't available in rxjs
.
Here is some code to illustrate the problem:
class Page {
private _events = new Subject<Event>();
get events(): Observable<Event> {
return this._events.asObservable();
}
}
class Browser {
pages = [] as Page[];
private _ownEvents = new Subject<Event>();
addPage(page : Page) {
this.pages.push(page);
}
removePage(page : Page) {
let ixPage = this.pages.indexOf(page);
if (ixPage < 0) return;
this.pages.splice(ixPage, 1);
}
get oneObservable() {
//this won't work for aforementioned reasons
return Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents);
}
}
It's in TypeScript, but it should be understandable.
Upvotes: 2
Views: 657
Reputation: 23463
You can switchMap()
on a Subject()
linked to array changes, replacing oneObservable with a fresh one when the array changes.
pagesChanged = new Rx.Subject();
addPage(page : Page) {
this.pages.push(page);
this.pagesChanged.next();
}
removePage(page : Page) {
let ixPage = this.pages.indexOf(page);
if (ixPage < 0) return;
this.pages.splice(ixPage, 1);
this.pagesChanged.next();
}
get oneObservable() {
return pagesChanged
.switchMap(changeEvent =>
Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents)
)
}
Testing,
const page1 = { events: Rx.Observable.of('page1Event') }
const page2 = { events: Rx.Observable.of('page2Event') }
let pages = [];
const pagesChanged = new Rx.Subject();
const addPage = (page) => {
pages.push(page);
pagesChanged.next();
}
const removePage = (page) => {
let ixPage = pages.indexOf(page);
if (ixPage < 0) return;
pages.splice(ixPage, 1);
pagesChanged.next();
}
const _ownEvents = Rx.Observable.of('ownEvent')
const oneObservable =
pagesChanged
.switchMap(pp =>
Rx.Observable.from(pages)
.mergeMap(x => x.events)
.merge(_ownEvents)
)
oneObservable.subscribe(x => console.log('subscribe', x))
console.log('adding 1')
addPage(page1)
console.log('adding 2')
addPage(page2)
console.log('removing 1')
removePage(page1)
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
Upvotes: 2
Reputation: 20033
You will need to manage the subscriptions to the pages yourself and feed its events into the resulting subject yourself:
const theOneObservable$ = new Subject<Event>();
function openPage(page: Page): Subscription {
return page.events$.subscribe(val => this.theOneObservable$.next(val));
}
Closing the page, i.e. calling unsubscribe
on the returned subscription, will already do everything it has to do.
Note that
theOneObservable$
is a hot observable here.
You can, of course, take this a bit further by writing your own observable type which encapsulates all of this API. In particular, this would allow you to unsubscribe all inner observables when it is being closed.
A slightly different approach is this:
const observables$ = new Subject<Observable<Event>>();
const theOneObservable$ = observables$.mergeMap(obs$ => obs$);
// Add a page's events; note that takeUntil takes care of the
// unsubscription process here.
observables$.next(page.events$.takeUntil(page.closed$));
This approach is superior in the sense that it will unsubscribe the inner observables automatically when the observable is unsubscribed.
Upvotes: 0