GregRos
GregRos

Reputation: 9103

Merge events from a changing list of Observables

I'm using rxjs.

I have a Browser that's responsible for a number of Page objects. Each page has an Observable<Event> that yields a stream of events.

Page objects are closed and opened at various times. I want to create one observable, called TheOneObservable that will merge all the events from all the currently active Page objects, and also merge in custom events from the Browser object itself.

Closing a Page means that the subscription to it should be closed so it doesn't prevent it from being GC'd.

My problem is that Pages can be closed at any time, which means that the number of Observables being merged is always changing. I've thought of using an Observable of Pages and using mergeMap, but there are problems with this. For example, a subscriber will only receive events of Pages that are opened after it subscribes.


Note that this question has been answered here for .NET, but using an ObservableCollection that isn't available in rxjs.


Here is some code to illustrate the problem:

class Page {
    private _events = new Subject<Event>();

    get events(): Observable<Event> {
        return this._events.asObservable();
    }
}

class Browser {
    pages = [] as Page[];
    private _ownEvents = new Subject<Event>();

    addPage(page : Page) {
        this.pages.push(page);
    }

    removePage(page : Page) {
        let ixPage = this.pages.indexOf(page);
        if (ixPage < 0) return;
        this.pages.splice(ixPage, 1);
    }

    get oneObservable() {
        //this won't work for aforementioned reasons
        return Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents);
    }
}

It's in TypeScript, but it should be understandable.

Upvotes: 2

Views: 657

Answers (2)

Richard Matsen
Richard Matsen

Reputation: 23463

You can switchMap() on a Subject() linked to array changes, replacing oneObservable with a fresh one when the array changes.

pagesChanged = new Rx.Subject();

addPage(page : Page) {
  this.pages.push(page);
  this.pagesChanged.next();
}

removePage(page : Page) {
  let ixPage = this.pages.indexOf(page);
  if (ixPage < 0) return;
  this.pages.splice(ixPage, 1);
  this.pagesChanged.next();
}

get oneObservable() {
  return pagesChanged
    .switchMap(changeEvent =>
      Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents)
    )
}

Testing,

const page1 = { events: Rx.Observable.of('page1Event') }
const page2 = { events: Rx.Observable.of('page2Event') }

let pages = []; 
const pagesChanged = new Rx.Subject();
const addPage = (page) => { 
  pages.push(page); 
  pagesChanged.next(); 
}
const removePage = (page) => { 
  let ixPage = pages.indexOf(page);
  if (ixPage < 0) return;
  pages.splice(ixPage, 1);
  pagesChanged.next(); 
}

const _ownEvents = Rx.Observable.of('ownEvent')

const oneObservable = 
  pagesChanged
    .switchMap(pp => 
      Rx.Observable.from(pages)
        .mergeMap(x => x.events)
        .merge(_ownEvents)
    )

oneObservable.subscribe(x => console.log('subscribe', x))

console.log('adding 1')
addPage(page1)
console.log('adding 2')
addPage(page2)
console.log('removing 1')
removePage(page1)
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

Upvotes: 2

Ingo B&#252;rk
Ingo B&#252;rk

Reputation: 20033

You will need to manage the subscriptions to the pages yourself and feed its events into the resulting subject yourself:

const theOneObservable$ = new Subject<Event>();

function openPage(page: Page): Subscription {
  return page.events$.subscribe(val => this.theOneObservable$.next(val));
}

Closing the page, i.e. calling unsubscribe on the returned subscription, will already do everything it has to do.

Note that theOneObservable$ is a hot observable here.

You can, of course, take this a bit further by writing your own observable type which encapsulates all of this API. In particular, this would allow you to unsubscribe all inner observables when it is being closed.


A slightly different approach is this:

const observables$ = new Subject<Observable<Event>>();
const theOneObservable$ = observables$.mergeMap(obs$ => obs$);

// Add a page's events; note that takeUntil takes care of the
// unsubscription process here.
observables$.next(page.events$.takeUntil(page.closed$));

This approach is superior in the sense that it will unsubscribe the inner observables automatically when the observable is unsubscribed.

Upvotes: 0

Related Questions