Reputation: 43
I was researching how to do multiple http calls in Angular 10 that didn't require you to write a bunch of calls and subscribes that makes the code unnecessarily long. Stumbled across the rxjs forkjoin. Code is simple, just 4 https calls that I do stuff with when they return
const observableCall$ = this._configService.getSingle( "Param1", 'Param2' ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", 'Param2' ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", "Param2" ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", "Param2" ).pipe( catchError( err => of( err ) ) );
forkJoin(
[
observableCall$,
observableCall$,
observableCall$,
observableCall$,
]
).subscribe( ( results: Config[] ) => {
//do stuff
} )
The problem I'm running in to is with expired tokens. If the token is expired, the call to get a new token happens 4 times, which is obviously unnecessary.
The code for the auth interceptor is this:
intercept( request: HttpRequest<unknown>, next: HttpHandler ): Observable<HttpEvent<unknown>> {
const authToken = this._authentication.token;
return ( !!authToken && ( request.url.indexOf( environment.authBaseUrl ) === -1 )
? next.handle( request.clone( {
headers: request.headers
.set( 'Authorization', `${authToken.token_type} ${authToken.access_token}` )
} ) ) : next.handle( request ) ).pipe( catchError( ( error ) => {
if ( error instanceof HttpErrorResponse && error.status === 401 ) {
return this._authentication.exchangeRefreshToken()
.pipe( mergeMap( token => next.handle( request.clone( {
headers: request.headers.append( 'Authorization', `${token.token_type} ${token.access_token}` )
} ) ) ) )
}
} ) )
}
Code for exchanging refresh token:
const headers = this._httpHeaders;
const body = new HttpParams()
.append( 'grant_type', 'refresh_token' )
.append( 'client_id', environment.clientId )
.append( 'client_secret', environment.clientSecret )
.append( 'refresh_token', this.token.refresh_token );
var token = this._httpClient.post( environment.authBaseUrl, body, { headers } )
.pipe( catchError( error => { return null; } ) ) as Observable<Token>;
if ( !( token instanceof Token ) ) return this.getToken();
else {
this.token = new Token( token );
return token;
}
Now I think I understand the problem, which is the forkjoin is calling all 4 at the same time, they come through the interceptor as their own call and when the token is expired they all get a 401, therefore wanting a new token 4 times. Is there an elegant way to have it where it only asks for a new token once and then continue with the rest of the calls?
Edit: I also tried just having those calls in the html with async pipes. I.E:
<div *ngIf="observableCall$ | async">{{ observableCall | json }}</div>
<div *ngIf="observableCall$ | async">{{ observableCall | json }}</div>
and the same thing happened. If there's a better way to do that I'm open for suggestions.
Thank you!
Edit(2): Code in auth interceptor thanks to Mark's help
import {
HttpErrorResponse,
HttpEvent,
HttpHandler,
HttpInterceptor,
HttpRequest
} from '@angular/common/http';
import { Injectable } from '@angular/core';
import { merge } from 'lodash';
import { Observable, of, Subject } from 'rxjs';
import { catchError, exhaustMap, filter, first, mergeMap, share, shareReplay, startWith, switchMap, tap } from 'rxjs/operators';
import { Token } from 'src/app/models/token/token.model';
import { environment } from 'src/environments/environment';
import { AuthorizationService } from '../services/authorization/authorization.service';
@Injectable()
export class AuthorizationInterceptor implements HttpInterceptor {
constructor( private _authentication: AuthorizationService ) { }
private _exchangeToken$ = new Subject<boolean>();
private _refreshToken$ = this._exchangeToken$.pipe(
filter( x => x ),
exhaustMap( () => this._authentication.exchangeRefreshToken() ),
share()
);
private _refreshTokenCache$ = this._refreshToken$.pipe(
startWith( null ),
shareReplay( 1 )
);
exchangeRefreshToken( expiredToken: Token ): Observable<Token> {
console.log( '2' );
const exchange = () => {
const startToken$ = of( true ).pipe(
tap( x => this._exchangeToken$.next( x ) ),
filter( _ => false )
);
return merge( startToken$, this._refreshToken$.pipe( first() ) );
}
return this._refreshTokenCache$.pipe(
first(),
switchMap( token =>
token == null || token === expiredToken ?
exchange() :
of( token )
)
);
}
intercept( request: HttpRequest<unknown>, next: HttpHandler ): Observable<HttpEvent<unknown>> {
const authToken = this._authentication.token;
return ( !!authToken && ( request.url.indexOf( environment.authBaseUrl ) === -1 )
? next.handle( request.clone( {
headers: request.headers
.set( 'Authorization', `${authToken.token_type} ${authToken.access_token}` )
} ) ) : next.handle( request ) ).pipe( catchError( ( error ) => {
if ( error instanceof HttpErrorResponse && error.status === 401 ) {
return this.exchangeRefreshToken( authToken )
.pipe( mergeMap( token => next.handle( request.clone( {
headers: request.headers.append( 'Authorization', `${token.token_type} ${token.access_token}` )
} ) ) ) );
}
} ) );
}
}
Upvotes: 0
Views: 1001
Reputation: 8022
You'll want to create a wrapper for this._authentication.exchangeRefreshToken()
that is multicasted and doesn't repeat attempts to get the refresh token.
One solution is to create a subject that you hook into to create your calls for refresh tokens.
private _exchangeToken$ = new Subject<boolean>();
private _refreshToken$ = _exchangeToken$.pipe(
filter(x => x),
exhaustMap(_ => this._authentication.exchangeRefreshToken()),
share()
);
function exchangeRefreshToken() : Observable<Token>{
const startToken$ = of(true).pipe(
tap(x => this._exchangeToken$.next(x)),
filter(_ => false)
);
return merge(startToken$, this._refreshToken$.pipe(first()));
}
In this pattern, all you've done is ensured that this._authentication.exchangeRefreshToken()
, is never called concurrently. If a second emission on _exchangeToken$ subject is created, it gets ignored and the caller will wait on the results of the first call. What's nice here is that you can swap out this._authentication.exchangeRefreshToken()
with this.exchangeRefreshToken()
and make no further changes
A better solution would be to cache the refreshed token and only get a new one if the one in the cache is expired. This is a bit more involved because you need a mechanism to know which tokens have expired. We can use shareReplay(1)
to keep a cache of the most recent token. We will also need to pass in the expired token so that we have something to compare our cached token against.
private _exchangeToken$ = new Subject<boolean>();
private _refreshToken$ = _exchangeToken$.pipe(
filter(x => x),
exhaustMap(_ => this._authentication.exchangeRefreshToken()),
share()
);
private _refreshTokenCache$ = _refreshToken$.pipe(
startWith(null),
shareReplay(1)
);
function exchangeRefreshToken(expiredToken) : Observable<Token>{
const exchange = () => {
const startToken$ = of(true).pipe(
tap(x => this._exchangeToken$.next(x)),
filter(_ => false)
);
return merge(startToken$, this._refreshToken$.pipe(first()));
}
return this._refreshTokenCache$.pipe(
first(),
switchMap(token =>
token == null || token === expiredToken ?
exchange() :
of(token)
)
);
}
This isn't perfect because the cached token might still be expired even though it happens to be newer than the expired token being passed in. What you really want is a function that can read the payload of a token and check if it's expired.
The issue with this is that how this might be done depends on the token. Is it a JWT? Custom? If it's encrypted, asking the server may be the only way. If you know how long a token lasts, you can store fresh tokens with your own timestamp and use that to decide when to refresh.
private _refreshTokenCache$ = _refreshToken$.pipe(
map(token => ({
token,
timeStamp: Date.now()
})),
shareReplay(1)
);
Really, there isn't a one-size-fits-all solution to this.
This might look somewhat strange:
of(true).pipe(
tap(x => this._exchangeToken$.next(x)),
filter(_ => false)
);
This is a stream that emits nothing and then completes. As a side effect though, it calls this._exchangeToken$.next(true)
.
So why not just call this._exchangeToken$.next(true)
directly like this?
function exchangeRefreshToken() : Observable<Token>{
this._exchangeToken$.next(true);
return this._refreshToken$.pipe(first());
}
We do it this way so that we create this effect when subscribed to, rather than when created. So this should still work.
const exchange$ = this.exchangeRefreshToken();
setTimeout(() => {
exchange$.subscribe(token => console.log("I got a token: ", token))
}, 60 x 60 x 1000);
Here we get a stream, then wait an HOUR and subscribe for a refresh token. If we don't generate the token as part of the subscription, then we will have subscribed too late to get the token, and "I got a token" will never get printed to the console.
Upvotes: 1