Reputation: 3456
Given an existing Observable (which has not completed yet), is there a way to retrieve associated subscribers (functions passed to subscribe) to make them subscribe to another Observable instead?
A service in my application helps creating SeverEvent connections, returning a ConnectableObservable to proxy connection and allow multicasting using publish operator. The service keeps track of existing connections via an internal store :
store: {[key: string]: ConnectionTracker};
// …
interface ConnectionTracker {
url: string;
eventSource: EventSource;
observable: rx.ConnectableObservable<any>;
subscription: rx.Subscription;
observer: rx.Observer<any>;
data?: any; // Arbitrary data
}
Upon connection creation, if an associated tracker already exists (identity is made using the connection's endpoint), the service should:
Here is the code part which creates ConnectionTrackers
/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
let fullUri = endpoint + (queryString ? `?${queryString}` : '')
, tracker = this.findTrackerByEndpoint(endpoint) || {
observable: null,
fullUri: fullUri,
eventSource: null,
observer: null,
subscription: null
}
;
// Tracker exists
if (tracker.observable !== null) {
// If fullUri hasn't changed, use the tracker as is
if (tracker.fullUri === fullUri) {
return tracker;
}
// At this point, we know "fullUri" has changed, the tracker's
// connection should be replaced with a fresh one
// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
// them subscribe to the new Observable instead (created down below)
// Terminate previous connection and clean related resouces
tracker.observer.complete();
tracker.eventSource.close();
}
tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
// Executed once
tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
tracker.eventSource.onerror = e => observer.error(e);
// Keep track of the observer
tracker.observer = observer;
})
// Transform Observable into a ConnectableObservable for multicast
.publish()
;
// Start emitting right away and also keep a reference to
// proxy subscription for later disposal
tracker.subscription = tracker.observable.connect();
return tracker;
}
Thank you.
Upvotes: 4
Views: 1593
Reputation: 14109
Instead of trying to carry Subscribers from one Observable over to another manually you should supply listeners with an Observable that will automatically switch to a different Observable when needed.
You do so by working with a high order Observable (an Observable that emits Observables) which always switches to the most recent inner Observable.
// a BehaviorSubject is used so that late subscribers also immediately get the most recent inner Observable
const higherOrderObservable = new BehaviorSubject<Observable<any>>(EMPTY);
// pass new Observable to listeners
higherOrderObservable.next(new Observable(..));
// get most recent inner Observable
const currentObservable = higherOrderObservable.pipe(switchMap(obs => obs));
currentObservable.subscribe(valueFromInnerObservable => { .. })
For every endpoint create a BehaviorSubject
(tracker supplier) that emits the Observable (tracker) that should be currently used for that endpoint. When a different tracker for a given endpoint should be used, pass this new Observable to the BehaviorSubject
. Let your listeners subscribe to the BehaviorSubject
(tracker supplier) that automatically supplies them with the right tracker, i.e. switches to the Observable that should currently be use.
A simplified version of your code could look like the one below. The specifics depend on how you're using the function createTracker
throughout your app.
interface ConnectionTracker {
fullUri: string;
tracker$: ConnectableObservable<any>;
}
// Map an endpoint to a tracker supplier.
// This is your higher order Observable as it emits objects that wrap an Observable
store: { [key: string]: BehaviorSubject<ConnectionTracker> };
closeAllTrackers$ = new Subject();
// Creates a new tracker if necessary and returns a ConnectedObservable for that tracker.
// The ConnectedObservable will always resemble the current tracker.
createTracker<T>(endpoint: string, queryString: string = null): Observable<any> {
const fullUri = endpoint + (queryString ? `?${queryString}` : '');
// if no tracker supplier for the endpoint exists, create one
if (!store[endpoint]) {
store[endpoint] = new BehaviorSubject<ConnectionTracker>(null);
}
const currentTracker = store[endpoint].getValue();
// if no tracker exists or the current one is obsolete, create a new one
if (!currentTracker || currentTracker.fullUri !== fullUri) {
const tracker$ = new Observable<T>(subscriber => {
const source = new EventSource(fullUri, { withCredentials: true });
source.onmessage = e => subscriber.next(JSON.parse(e.data));
source.onerror = e => subscriber.error(e);
return () => source.close(); // on unsubscribe close the source
}).pipe(publish()) as ConnectableObservable<any>;
tracker$.connect();
// pass the new tracker to the tracker supplier
store[endpoint].next({ fullUri, tracker$ });
}
// return the tracker supplier for the given endpoint that always switches to the current tracker
return store[endpoint].pipe(
switchMap(tracker => tracker ? tracker.tracker$ : EMPTY), // switchMap will unsubscribe from the previous tracker and thus close the connection if a new tracker comes in
takeUntil(this.closeAllTrackers$) // complete the tracker supplier on emit
);
}
// close all trackers and remove the tracker suppliers
closeAllTrackers() {
this.closeAllTrackers$.next();
this.store = {};
}
If you want to close all tracker connections at once and existing subscribers should get a complete
notification, call closeAllTrackers
.
If you only want to close some tracker connections but don't want existing subscribers ro receive a complete
notification, so that they keep listening for new trackers supplied in the future, call store[trackerEndpoint].next(null)
for each tracker.
Upvotes: 2
Reputation: 1585
If you try to do things like moving a subscriber to a different observable then you are just not doing things like intended in RxJS. Any such kind of manipulation is basically hacking.
If you occasionally produce a new observable (e.g. by making a request) and you want some subscriber to always be subscribed to the most recent of them, then here's the solution:
private observables: Subject<Observable<Data>> = new Subject();
getData(): Observable<Data> {
return this.observables.pipe(switchAll());
}
onMakingNewRequest(newObservable: Observable<Data>) {
this.observables.push(newObservable);
}
This way you can expose a single observable (via getData()
) to which the client subscribes, but by pushing to this.observables
you change the actual source of data that the user sees.
As for closing the connection and similar stuff, your observable (the one created with every request or something) should basically take care of releasing and closing stuff when it's unsubscribed from, then you don't need to do any extra handling, the previous observable will automatically get unsubscribed from the moment you push the new one. Details depend on the actual backend you are contacting.
Upvotes: 1