Reputation: 34593
In my Angular app, I'm manually validating some form data from JSON.
As such, I'm subscribing to a Subject that receives a change event:
private _contentChanged = new Subject<IEditorContentChangedEvent>();
contentChanged$: Observable<IEditorContentChangedEvent>;
constructor() {
this.contentChanged$ = this._contentChanged.asObservable();
}
onContentChanged(event: IEditorContentChangedEvent): void {
this._contentChanged.next(event);
}
ngOnInit(): void {
this.contentChanged$
.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
console.log(this.resource);
this.parentForm.setValue(this.resource);
return of(event);
}),
catchError((error: Error) => {
...
return EMPTY;
}),
)
.subscribe(() => {
...
});
}
I see the log statement in map()
the first time contentChanged$ receives a value, but not the second time if the code in map
throws an error.
I'm assuming this is because catchError()
returns EMPTY
. I've been following guides such as this article on Medium, but no luck so far.
I have tried returning: of(error)
in the catchError()
handler, but that doesn't work either.
If I just subscribe to contentChanged$
without any operators, I see each time _contentChanged
has received a new value, so my source of data is working as expected.
If I make a change that does not cause catchError
to be invoked, I see the log statement in map
, so it has to be catchError
that's closing the stream.
How can I keep the Observable/Subscription alive to process new contentChanged$
values?
Upvotes: 1
Views: 1528
Reputation: 888
As your comment setValue
is throwing error, that error is being caught in CathError
and as per observable behaviour source stream will be dead after any kind of error in source stream chain(pipe) or completion of source stream
In-order to keep stream alive you need to handle this error internally -
switchMap/ConcatMap...(() => child$.pipe(catchError(...))
try catch block
In your case it's JSON.parse
error,which is a javascript error,so wrapped it with try catch
map(() => {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
console.log(this.resource);
this.parentForm.setValue(this.resource);
return of(event);
}),
Modify above snippet with below one
map(() => {
try {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
} catch(err){
console.error(err);
}
console.log(this.resource);
this.parentForm.setValue(this.resource);
return event;
}),
Upvotes: 1
Reputation: 8062
Here's an overview: link
The relevant part:
Upon issuing an complete or error notification, an observable may not thereafter issue any further notifications.
This means that once an observable completes or errors, it's done. They are terminal emissions/notifications.
Don't let your observable emit an error. If you know that some synchronous code in your map is throwing an error, You can catch it and deal with it there and then it'll never propagate up into your observable:
try {
this.resource = JSON.parse(/* ... */);
} catch(err){
console.error(err);
}
Once the source observable errors, just re-subscribe to your source observable. Whether this works depends on what side effects are created when you subscribe to your source in the first place.
this.contentChanged$.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
...
}),
tap({error: (error: Error) => console.error(error) }),
// By default, if contentChanged$ keeps erroring, this will
// keep retrying for forever.
retry()
).subscribe(() => {
...
});
You can also conditionally retry by returning an observable from catchError
this.contentChanged$.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
...
}),
catchError((error: Error, source$) => {
console.error(error)
if (/* retry? */){
return source$;
} else if (/* Do nothing? */) {
return EMPTY;
} else if (/* Emit some string values? */) {
return of("Hello", "World");
}
// Otherwise rethrow the error!
return throwError(() => error);
})
).subscribe(() => {
...
});
Upvotes: 0