Reputation: 76
I have been trying to integrate Authlib Flask Client to achieve the OIDC via Google OIDC. I am using Flask SQLAlchemy, and everything is working perfectly fine. The only problem is I am unable to refresh the token when expired.
Questions:
I thought this is being done automatically, and I don't need to validate the token myself. Below is my code, with DB model, and also the fetch_token and update_token.
Few notes:
expires_at
inside the fetch_token method.oauth.google.token
sender
in the update_token method, but it does not work as well.Can someone please help me understand what am I doing wrong here. below is my code.
import time
from authlib.integrations.flask_client import OAuth
from authlib.oidc.core.errors import LoginRequiredError
from flask import Flask
from flask import current_app as app, redirect, url_for, session
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, Integer, String
class OAuth2Token(app.db.Model):
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
name = Column(String(20), nullable=False)
access_token = Column(String(255), nullable=False)
expires_in = Column(Integer, default=0)
scope = Column(String, default=0)
token_type = Column(String(20))
refresh_token = Column(String(255))
expires_at = Column(Integer, default=0)
def to_token(self):
return dict(
access_token=self.access_token,
expires_in=self.expires_in,
scope=self.scope,
token_type=self.token_type,
refresh_token=self.refresh_token,
expires_at=self.expires_at,
)
@property
def is_active(self):
return self.expires_at > round(time.time())
@staticmethod
def save(**kwargs):
item = OAuth2Token(**kwargs)
app.db.session.add(item)
app.db.session.commit()
@staticmethod
def get(**kwargs):
return OAuth2Token.query.filter_by(**kwargs).first()
@staticmethod
def delete(**kwargs):
OAuth2Token.query.filter_by(**kwargs).delete()
app.db.session.commit()
@staticmethod
def get_active(name, user_id, int_time):
return OAuth2Token.query.filter(OAuth2Token.name == name,
OAuth2Token.user_id == user_id,
OAuth2Token.expires_at >= int_time
).first()
@staticmethod
def all():
return OAuth2Token.query.all()
@staticmethod
def update_tokens(name, token, refresh_token=None, access_token=None):
if refresh_token:
item = OAuth2Token.get(name=name, refresh_token=refresh_token)
elif access_token:
item = OAuth2Token.get(name=name, access_token=access_token)
else:
return
item.access_token = token['access_token']
item.refresh_token = token.get('refresh_token')
item.expires_at = token['expires_at']
app.db.session.commit()
def _update_token(name, token, refresh_token=None, access_token=None):
try:
OAuth2Token.update_tokens(name, token, refresh_token=refresh_token, access_token=access_token)
except Exception as ex:
print("exception")
def _fetch_token(name):
try:
_current_time = round(time.time())
token = OAuth2Token.get_active(name=name,
user_id=session["user"]["id"],
int_time=_current_time)
if not token:
return None
return token.to_token()
except Exception as ex:
raise LoginRequiredError
class CustomApp(Flask):
def __init__(self, *args, **kwargs):
super(CustomApp, self).__init__(*args, **kwargs)
self.db = SQLAlchemy(self)
with self.app_context():
self.db.create_all()
oauth = OAuth(
self,
fetch_token=_fetch_token,
update_token=_update_token
)
self.auth_client = oauth.register(
name='google',
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={
'scope': 'openid email profile',
},
authorize_params={
'access_type': 'offline',
'prompt': 'consent'
}
)
@self.route('/login')
def login():
redirect_uri = url_for('auth', _external=True)
return self.auth_client.authorize_redirect(redirect_uri)
@self.route('/auth')
def auth():
try:
token = self.auth_client.authorize_access_token()
user = self.auth_client.parse_id_token(token)
user_id = user.user_id
# Since we are not storing the id_token in the model
token.pop("id_token")
OAuth2Token.save(name='google', user_id=user_id, **token)
session["user"] = user
return redirect('/')
except Exception as ex:
raise ex
@self.route('/logout')
def logout():
if session.get("user"):
OAuth2Token.delete(name='google', user_id=session["user"]["id"])
session.pop('user', None)
return redirect(url_for('login'))
Upvotes: 3
Views: 1245