Reputation: 19451
I'd like to use use AsyncLocalStorage in a NestJs Interceptor:
export interface CallHandler<T = any> {
handle(): Observable<T>;
}
export interface NestInterceptor<T = any, R = any> {
intercept(context: ExecutionContext, next: CallHandler<T>): Observable<R> | Promise<Observable<R>>;
}
The interceptor function gets a next
CallHandler
that returns an Observable
.
I cannot use run in this case (the run callback will exit immediately before the callHandler.handle()
observable has finished):
intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
const asyncLocalStorage = new AsyncLocalStorage();
const myStore = { some: 'data'};
return asyncLocalStorage.run(myStore, () => callHandler.handle());
}
The solution I came up with is this:
const localStorage = new AsyncLocalStorage();
export class MyInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
const resource = new AsyncResource('AsyncLocalStorage', { requireManualDestroy: true });
const myStore = { some: 'data' };
localStorage.enterWith(myStore);
return callHandler.handle().pipe(
finalize(() => resource.emitDestroy())
);
}
}
This seems to work fine, but I am not sure if this is really correct - and it looks messy and error-prone. So I wonder:
Upvotes: 9
Views: 2780
Reputation: 1773
There was an issue related to this : https://github.com/nestjs/nest/pull/11142.
AsynclocalStorage will be suppported in NestSJ 10.0.0.
Upvotes: 2
Reputation: 19451
here is our current solution to the problem:
callHandler
localStorage.run
methodconst localStorage = new AsyncLocalStorage();
export class MyInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
const myStore = { some: 'data' };
return new Observable((subscriber) => {
const subscription = localStorage.run(myStore, () => {
/**
* - run the handler function in the run callback, so that myStore is set
* - subscribe to the handler and pass all emissions of the callHandler to our subscriber
*/
return callHandler.handle().subscribe(subscriber);
});
/**
* return an unsubscribe method
*/
return () => subscription.unsubscribe();
});
}
}
Upvotes: 2
Reputation: 11
Here is a solution for cls-hooks:
return new Observable(observer => {
namespace.runAndReturn(async () => {
namespace.set("some", "data")
next.handle()
.subscribe(
res => observer.next(res),
error => observer.error(error),
() => observer.complete()
)
})
})
Upvotes: 0
Reputation: 487
Below is the solution I came up with. My understanding of the problem is that you need the run
function to receive a callback function that will fully encapsulate the execution of the handler, however, the intercept
function is expected to return an observable that has not yet been triggered. This means that if you encapsulate the observable itself in the run
callback function, it will not have been triggered yet.
My solution, below, is to return a new observable that, when triggered, will be responsible for triggering (i.e. subscribing to) the call handler itself. As a result, the promise we create in the run
call can fully encapsulate the handle function and it's async callbacks.
Here is the general functionality in a stand-alone function so that you can see it all together:
intercept(context: ExecutionContext, next: CallHandler<any>): Observable<any> {
return new Observable((subscribe) => {
asyncStorage.run({}, () => new Promise(resolve => {
next.handle().subscribe(
result => {
subscribe.next(result);
subscribe.complete();
resolve();
},
error => {
subscribe.error(err);
resolve();
}
);
}));
});
}
Next, I took that concept and integrated it into my interceptor below.
export class RequestContextInterceptor implements NestInterceptor {
constructor(
private readonly requestContext: RequestContext,
private readonly localStorage: AsyncLocalStorage<RequestContextData>
) {}
intercept(context: ExecutionContext, next: CallHandler<any>): Observable<any> {
const contextData = this.requestContext.buildContextData(context);
return new Observable((subscribe) => {
void this.localStorage.run(contextData, () => this.runHandler(next, subscribe));
});
}
private runHandler(next: CallHandler<any>, subscribe: Subscriber<any>): Promise<void> {
return new Promise<void>((resolve) => {
next.handle().subscribe(
(result) => {
subscribe.next(result);
subscribe.complete();
resolve();
},
(err) => {
subscribe.error(err);
resolve();
}
);
});
}
}
It's worth noting that the Promise
that is created during the run
call does not have a rejection path. This is intentional. The error is passed on to the observable that is wrapping the promise. This means that the outer observable will still succeed or error depending upon what the inner observable does, however, the promise that wraps the inner observable will always resolve regardless.
Upvotes: 2