Verdan Mahmood
Verdan Mahmood

Reputation: 76

Authlib Flask Client - How to validate and refresh the access token

I have been trying to integrate Authlib Flask Client to achieve the OIDC via Google OIDC. I am using Flask SQLAlchemy, and everything is working perfectly fine. The only problem is I am unable to refresh the token when expired.

Questions:

I thought this is being done automatically, and I don't need to validate the token myself. Below is my code, with DB model, and also the fetch_token and update_token.

Few notes:

Can someone please help me understand what am I doing wrong here. below is my code.

import time
from authlib.integrations.flask_client import OAuth
from authlib.oidc.core.errors import LoginRequiredError
from flask import Flask
from flask import current_app as app, redirect, url_for, session
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, Integer, String


class OAuth2Token(app.db.Model):
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, nullable=False)
    name = Column(String(20), nullable=False)

    access_token = Column(String(255), nullable=False)
    expires_in = Column(Integer, default=0)
    scope = Column(String, default=0)
    token_type = Column(String(20))
    refresh_token = Column(String(255))
    expires_at = Column(Integer, default=0)

    def to_token(self):
        return dict(
            access_token=self.access_token,
            expires_in=self.expires_in,
            scope=self.scope,
            token_type=self.token_type,
            refresh_token=self.refresh_token,
            expires_at=self.expires_at,
        )

    @property
    def is_active(self):
        return self.expires_at > round(time.time())

    @staticmethod
    def save(**kwargs):
        item = OAuth2Token(**kwargs)
        app.db.session.add(item)
        app.db.session.commit()

    @staticmethod
    def get(**kwargs):
        return OAuth2Token.query.filter_by(**kwargs).first()

    @staticmethod
    def delete(**kwargs):
        OAuth2Token.query.filter_by(**kwargs).delete()
        app.db.session.commit()

    @staticmethod
    def get_active(name, user_id, int_time):
        return OAuth2Token.query.filter(OAuth2Token.name == name,
                                        OAuth2Token.user_id == user_id,
                                        OAuth2Token.expires_at >= int_time
                                        ).first()

    @staticmethod
    def all():
        return OAuth2Token.query.all()

    @staticmethod
    def update_tokens(name, token, refresh_token=None, access_token=None):
        if refresh_token:
            item = OAuth2Token.get(name=name, refresh_token=refresh_token)
        elif access_token:
            item = OAuth2Token.get(name=name, access_token=access_token)
        else:
            return

        item.access_token = token['access_token']
        item.refresh_token = token.get('refresh_token')
        item.expires_at = token['expires_at']
        app.db.session.commit()


def _update_token(name, token, refresh_token=None, access_token=None):
    try:
        OAuth2Token.update_tokens(name, token, refresh_token=refresh_token, access_token=access_token)
    except Exception as ex:
        print("exception")


def _fetch_token(name):
    try:
        _current_time = round(time.time())
        token = OAuth2Token.get_active(name=name,
                                       user_id=session["user"]["id"],
                                       int_time=_current_time)
        if not token:
            return None
        return token.to_token()
    except Exception as ex:
        raise LoginRequiredError


class CustomApp(Flask):
    def __init__(self, *args, **kwargs):
        super(CustomApp, self).__init__(*args, **kwargs)

        self.db = SQLAlchemy(self)

        with self.app_context():
            self.db.create_all()

            oauth = OAuth(
                self,
                fetch_token=_fetch_token,
                update_token=_update_token
            )

            self.auth_client = oauth.register(
                name='google',
                server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
                client_kwargs={
                    'scope': 'openid email profile',
                },
                authorize_params={
                    'access_type': 'offline',
                    'prompt': 'consent'
                }
            )

        @self.route('/login')
        def login():
            redirect_uri = url_for('auth', _external=True)
            return self.auth_client.authorize_redirect(redirect_uri)

        @self.route('/auth')
        def auth():
            try:
                token = self.auth_client.authorize_access_token()
                user = self.auth_client.parse_id_token(token)
                user_id = user.user_id
                # Since we are not storing the id_token in the model
                token.pop("id_token")
                OAuth2Token.save(name='google', user_id=user_id, **token)
                session["user"] = user
                return redirect('/')
            except Exception as ex:
                raise ex

        @self.route('/logout')
        def logout():
            if session.get("user"):
                OAuth2Token.delete(name='google', user_id=session["user"]["id"])
            session.pop('user', None)
            return redirect(url_for('login'))

Upvotes: 3

Views: 1245

Answers (0)

Related Questions