Reputation: 2311
I want to create an rxjs
Observable
that runs a long polling operation.
Each iteration emits intermediate results.
When isComplete
returns true, the Observable
completes.
This function should behave as follows
The following code works properly and satisfies conditions (1) and (2):
function longPollingAction(fetch: () => Promise<Response>, cancel: () => {}): Observable<Response> {
return defer(() => { // defer to start running when there's a single subscriber
return from(fetch()).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
);
}).pipe(share()); // share to allow multiple subscribers
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
How can I modify this code to satisfy (3) as well? With the current implementation the polling stops, but how do I call cancel
?
Upvotes: 5
Views: 937
Reputation: 13071
There is more than one way to skin a cat, but this is how I would do it:
const onUnsubscribe = (callback: () => void) => <T>(source$: Observable<T>) =>
new Observable<T>(observer => {
let isSourceDone = false;
const subscription = source$.subscribe(
val => {
observer.next(val);
},
e => {
isSourceDone = true;
observer.error(e);
},
() => {
isSourceDone = true;
observer.complete();
}
);
return () => {
if (isSourceDone) return;
callback();
subscription.unsubscribe();
};
});
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => {}
): Observable<Response> {
const lazyFetch$ = defer(() => fetch());
return lazyFetch$.pipe(
expand(() => timer(1000).pipe(mergeMapTo(lazyFetch$))),
takeWhile<Response>(isComplete, false),
onUnsubscribe(cancel),
share()
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
Upvotes: 0
Reputation: 8022
You can call cancel using finalize
. Here's how that might look:
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
finalize(cancel),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
complete
The tap operator has access to next
, error
, and complete
emissions. For a callback: () => void
, that's good enough.
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
tap({
complete: cancel
}),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
unsubscribe
I don't think such an operator exists, but we can make one easily enough. This operator will only fire the callback if unsubscribed. It will ignore error
, and complete
.
function onUnsubscribe<T>(
fn: () => void
): MonoTypeOperatorFunction<T> {
return s => new Observable(observer => {
const bindOn = name => observer[name].bind(observer);
const sub = s.subscribe({
next: bindOn("next"),
error: bindOn("error"),
complete: bindOn("complete")
});
return {
unsubscribe: () => {
fn();
sub.unsubscribe()
}
};
});
}
Then you can use it like this:
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
onUnsubscribe(cancel),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
Since share
is managing your subscriptions and share will only unsubscribe once refCount < 1
, then the only way to call cancel in this case is to have no subscribers.
Upvotes: 1