Reputation: 403
Working with the new version of RxJS 6 and the pipe operator in particular. Currently using the pipe to take the results of an API call and pass them to a series of additional tasks.
All works great, but can't seem to find a way to cancel or end a pipe should I encounter an issue. For example, I'm using the tap operator to check if the value is null. I then throw an error, but the pipe still appears to move to the next task, in this case concatmap.
Therefore, how do you end or cancel a pipe prematurely? Thanks in advance.
getData(id: String): Observable<any[]> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
tap(evt => {
if (evt == null) {
return throwError(new Error("No data found..."));
}
}),
concatMap(
evt =>
<Observable<any[]>>(
this.http.get<any[]>(
`${this.baseUrl}/path/relatedby/${evt.child_id}`
).map(res =>( {"response1":evt, "response2":res}) )
)
),
retry(3),
catchError(this.handleError("getData", []))
);}
Upvotes: 34
Views: 38178
Reputation: 383
If you just want to end the execution of the pipe without throwing an error you can return EMPTY.
myObservable.pipe(
switchMap(c => c === null ? EMPTY: of(c)),
tap(() => console.log('If c is null this is not executed anymore'))
);
Also the takeWhile operator is an option.
myObservable.pipe(
takeWhile(c => c !== null),
tap(() => console.log('If c is null this is not executed anymore'))
);
Both methods straight go to complete without go down the pipe!
Upvotes: 8
Reputation: 5310
I'd like to suggest a different approach. This is in case you don't want to throw an error, just handle the case. Create two observables and return one of them in a switchMap
based on an if
check.
getData(id: String): Observable<any> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
switchMap(evt => {
if (evt == null) {
// The chain will end here
// subscription succeeds
return of(null);
}
// This is an alternative chain
// You can construct further logic with rxjs operators
return <Observable<any[]>>(
this.http.get<any[]>(
`${this.baseUrl}/path/relatedby/${evt.child_id}`
).map(res =>( {"response1":evt, "response2":res}) );
}),
);}
Pardon any typos, this is just a quick example to demonstrate the approach.
Upvotes: 4
Reputation: 11787
You can also cancel/end a pipe by using a signal Subject
and the rxjs operator: takeUntil
Example
httpGetSafe(path: string): Observable<Data> {
const stopSignal$ = new Subject();
return this.http.get<Data>(path).pipe(
map(data => {
const isBad = data === null;
if (isBad) {
stopSignal$.next();
}
return data;
}),
takeUntil(stopSignal$)
);
}
Sometimes it's a bit simpler and more flexible than throwing errors...
Upvotes: 22
Reputation: 903
RXJS can throw and error to stop execution. The operator you applied in the pipe was just the wrong choice.
The 'tap' operator that you are using is only meant to be used to apply side-effect, i.e. make changes to the DOM, console.log, or change the values for some variables that you've got on your component (i.e. this.counter = someValue). The tap operator was not meant to to change the RXJS 'stream' - it will just return the same observable as it received. https://rxjs-dev.firebaseapp.com/api/operators/tap
On the other hand, operators that do work on the stream, like 'map', can throw errors. See this stackblitz
In summary, the code is
getData(): Observable<any[]> {
return this.httpGet('basePath').pipe(
map(evt => {
if (evt === null) {
return throwError(new Error("No data found..."));
} else {
return evt;
}
}),
concatMap(evt => {
return this.httpGet(`childPath/${evt.serverResult}`);
}),
map(res => {
return {
"response2": res.serverResult
};
}),
retry(3),
catchError(error => {
console.log('error handled!');
return of(null);
})
)
}
private httpGet(someString: string): Observable<{ serverResult: number }> {
return timer(1000).pipe( // fake a server call that takes 1 second
map(_ => { // fake a server result
// Comment out the valid return and uncomment the null return to see it in action
//return null;
return {
serverResult: 1
}
})
);
}
If you don't want to transform the error into a valid value in the stream, handle it in the error callback on the subscribe function.
Upvotes: 9
Reputation: 60528
I tried the basic concept from what you have with this stackblitz and it worked. It cancelled the remaining operations. See the link below.
https://stackblitz.com/edit/angular-4ctwsd?file=src%2Fapp%2Fapp.component.ts
Differences I see between your code and mine is that I used throw
and not throwError
(is that something you wrote?) and I'm just throwing the error ... not returning a thrown error.
Here is the code for reference:
import { Component } from '@angular/core';
import { of, from } from 'rxjs';
import { map, catchError, tap, retry} from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent {
name = 'Angular 6';
constructor() {
of('a', 'b', 'c', 'd')
.pipe(
map(x => {
if (x === 'c') {
throw 'An error has occurred';
}
return x;
}),
tap(x => console.log('In tap: ', x)),
retry(3),
catchError(() => of('caught error!'))
)
.subscribe(x => console.log(x));
}
}
Upvotes: 12
Reputation: 7427
Observable functions are always wrapped internally in a try/catch block. Any error thrown in the stream will end the stream and call any error callbacks to subscribers or operators.
The problem here is with throwError()
. I don't know what that function is and I don't recognize it as an Observable operator, but it looks like it's being used as one (and never subscribed to).
tap
is usually only used for side effects only, as it is completely unable to affect the values in the stream. However, as I mentioned with the try/catch blocks before, you should just be able to throw
a new Error
and the stream will take care of the rest.
getData(id: String): Observable<any[]> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
tap(evt => {
if (evt == null) {
throw new Error("No data found...");
}
}),
concatMap(
evt =>
<Observable<any[]>>(
this.http.get<any[]>(
`${this.baseUrl}/path/relatedby/${evt.child_id}`
).map(res =>( {"response1":evt, "response2":res}) )
)
),
retry(3),
catchError(this.handleError("getData", []))
);}
Upvotes: 2