M.Chappell
M.Chappell

Reputation: 43

Using rxjs forkjoin and angular 10 http interceptor, how do I exchange only 1 refresh token with multiple http calls?

I was researching how to do multiple http calls in Angular 10 that didn't require you to write a bunch of calls and subscribes that makes the code unnecessarily long. Stumbled across the rxjs forkjoin. Code is simple, just 4 https calls that I do stuff with when they return

const observableCall$ = this._configService.getSingle( "Param1", 'Param2' ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", 'Param2' ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", "Param2" ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", "Param2" ).pipe( catchError( err => of( err ) ) );
    forkJoin(
      [
        observableCall$,
        observableCall$,
        observableCall$,
        observableCall$,
      ]
    ).subscribe( ( results: Config[] ) => {
      //do stuff
    } )

The problem I'm running in to is with expired tokens. If the token is expired, the call to get a new token happens 4 times, which is obviously unnecessary.

The code for the auth interceptor is this:

intercept( request: HttpRequest<unknown>, next: HttpHandler ): Observable<HttpEvent<unknown>> {
    const authToken = this._authentication.token;
    return ( !!authToken && ( request.url.indexOf( environment.authBaseUrl ) === -1 )
      ? next.handle( request.clone( {
        headers: request.headers
          .set( 'Authorization', `${authToken.token_type} ${authToken.access_token}` )
      } ) ) : next.handle( request ) ).pipe( catchError( ( error ) => {
        if ( error instanceof HttpErrorResponse && error.status === 401 ) {
          return this._authentication.exchangeRefreshToken()
            .pipe( mergeMap( token => next.handle( request.clone( {
              headers: request.headers.append( 'Authorization', `${token.token_type} ${token.access_token}` )
            } ) ) ) )
        }
      } ) )
  }

Code for exchanging refresh token:

const headers = this._httpHeaders;
    const body = new HttpParams()
      .append( 'grant_type', 'refresh_token' )
      .append( 'client_id', environment.clientId )
      .append( 'client_secret', environment.clientSecret )
      .append( 'refresh_token', this.token.refresh_token );

    var token = this._httpClient.post( environment.authBaseUrl, body, { headers } )
      .pipe( catchError( error => { return null; } ) ) as Observable<Token>;
    if ( !( token instanceof Token ) ) return this.getToken();
    else {
      this.token = new Token( token );
      return token;
    }

Now I think I understand the problem, which is the forkjoin is calling all 4 at the same time, they come through the interceptor as their own call and when the token is expired they all get a 401, therefore wanting a new token 4 times. Is there an elegant way to have it where it only asks for a new token once and then continue with the rest of the calls?

Edit: I also tried just having those calls in the html with async pipes. I.E:

<div *ngIf="observableCall$ | async">{{ observableCall | json }}</div>
    <div *ngIf="observableCall$ | async">{{ observableCall | json }}</div>

and the same thing happened. If there's a better way to do that I'm open for suggestions.

Thank you!

Edit(2): Code in auth interceptor thanks to Mark's help

import {
  HttpErrorResponse,
  HttpEvent,
  HttpHandler,
  HttpInterceptor,
  HttpRequest
} from '@angular/common/http';
import { Injectable } from '@angular/core';
import { merge } from 'lodash';
import { Observable, of, Subject } from 'rxjs';
import { catchError, exhaustMap, filter, first, mergeMap, share, shareReplay, startWith, switchMap, tap } from 'rxjs/operators';
import { Token } from 'src/app/models/token/token.model';
import { environment } from 'src/environments/environment';
import { AuthorizationService } from '../services/authorization/authorization.service';

@Injectable()
export class AuthorizationInterceptor implements HttpInterceptor {
  constructor( private _authentication: AuthorizationService ) { }
  private _exchangeToken$ = new Subject<boolean>();

  private _refreshToken$ = this._exchangeToken$.pipe(
    filter( x => x ),
    exhaustMap( () => this._authentication.exchangeRefreshToken() ),
    share()
  );

  private _refreshTokenCache$ = this._refreshToken$.pipe(
    startWith( null ),
    shareReplay( 1 )
  );

  exchangeRefreshToken( expiredToken: Token ): Observable<Token> {
    console.log( '2' );
    const exchange = () => {
      const startToken$ = of( true ).pipe(
        tap( x => this._exchangeToken$.next( x ) ),
        filter( _ => false )
      );
      return merge( startToken$, this._refreshToken$.pipe( first() ) );
    }

    return this._refreshTokenCache$.pipe(
      first(),
      switchMap( token =>
        token == null || token === expiredToken ?
          exchange() :
          of( token )
      )
    );
  }

  intercept( request: HttpRequest<unknown>, next: HttpHandler ): Observable<HttpEvent<unknown>> {
    const authToken = this._authentication.token;
    return ( !!authToken && ( request.url.indexOf( environment.authBaseUrl ) === -1 )
      ? next.handle( request.clone( {
        headers: request.headers
          .set( 'Authorization', `${authToken.token_type} ${authToken.access_token}` )
      } ) ) : next.handle( request ) ).pipe( catchError( ( error ) => {
        if ( error instanceof HttpErrorResponse && error.status === 401 ) {
          return this.exchangeRefreshToken( authToken )
            .pipe( mergeMap( token => next.handle( request.clone( {
              headers: request.headers.append( 'Authorization', `${token.token_type} ${token.access_token}` )
            } ) ) ) );
        }
      } ) );
  }
}

Upvotes: 0

Views: 1001

Answers (1)

Mrk Sef
Mrk Sef

Reputation: 8022

You'll want to create a wrapper for this._authentication.exchangeRefreshToken() that is multicasted and doesn't repeat attempts to get the refresh token.

One solution is to create a subject that you hook into to create your calls for refresh tokens.

private _exchangeToken$ = new Subject<boolean>();

private _refreshToken$ = _exchangeToken$.pipe(
  filter(x => x),
  exhaustMap(_ => this._authentication.exchangeRefreshToken()),
  share()
);

function exchangeRefreshToken() : Observable<Token>{
  const startToken$ = of(true).pipe(
    tap(x => this._exchangeToken$.next(x)),
    filter(_ => false)
  );
  return merge(startToken$, this._refreshToken$.pipe(first()));
}

In this pattern, all you've done is ensured that this._authentication.exchangeRefreshToken(), is never called concurrently. If a second emission on _exchangeToken$ subject is created, it gets ignored and the caller will wait on the results of the first call. What's nice here is that you can swap out this._authentication.exchangeRefreshToken() with this.exchangeRefreshToken() and make no further changes

A better solution would be to cache the refreshed token and only get a new one if the one in the cache is expired. This is a bit more involved because you need a mechanism to know which tokens have expired. We can use shareReplay(1) to keep a cache of the most recent token. We will also need to pass in the expired token so that we have something to compare our cached token against.

private _exchangeToken$ = new Subject<boolean>();

private _refreshToken$ = _exchangeToken$.pipe(
  filter(x => x),
  exhaustMap(_ => this._authentication.exchangeRefreshToken()),
  share()
);

private _refreshTokenCache$ = _refreshToken$.pipe(
  startWith(null),
  shareReplay(1)
);

function exchangeRefreshToken(expiredToken) : Observable<Token>{
  const exchange = () => {
    const startToken$ = of(true).pipe(
      tap(x => this._exchangeToken$.next(x)),
      filter(_ => false)
    );
    return merge(startToken$, this._refreshToken$.pipe(first()));
  }      

  return this._refreshTokenCache$.pipe(
    first(),
    switchMap(token => 
      token == null || token === expiredToken ?
      exchange() :
      of(token)
    )
  );
}

This isn't perfect because the cached token might still be expired even though it happens to be newer than the expired token being passed in. What you really want is a function that can read the payload of a token and check if it's expired.

The issue with this is that how this might be done depends on the token. Is it a JWT? Custom? If it's encrypted, asking the server may be the only way. If you know how long a token lasts, you can store fresh tokens with your own timestamp and use that to decide when to refresh.

private _refreshTokenCache$ = _refreshToken$.pipe(
  map(token => ({
    token,
    timeStamp: Date.now()
  })),
  shareReplay(1)
);

Really, there isn't a one-size-fits-all solution to this.


An aside:

This might look somewhat strange:

of(true).pipe(
  tap(x => this._exchangeToken$.next(x)),
  filter(_ => false)
);

This is a stream that emits nothing and then completes. As a side effect though, it calls this._exchangeToken$.next(true).

So why not just call this._exchangeToken$.next(true) directly like this?

function exchangeRefreshToken() : Observable<Token>{
  this._exchangeToken$.next(true);
  return this._refreshToken$.pipe(first());
}

We do it this way so that we create this effect when subscribed to, rather than when created. So this should still work.

const exchange$ = this.exchangeRefreshToken();
setTimeout(() => { 
  exchange$.subscribe(token => console.log("I got a token: ", token))
}, 60 x 60 x 1000);

Here we get a stream, then wait an HOUR and subscribe for a refresh token. If we don't generate the token as part of the subscription, then we will have subscribed too late to get the token, and "I got a token" will never get printed to the console.


Related questions for more details/other approaches

  1. Angular 4 Interceptor retry requests after token refresh
  2. Angular: refresh token only once
  3. Handling refresh tokens using rxjs

Upvotes: 1

Related Questions