Reputation: 313
For an application, I have followed the fastAPI documentation for the authentification process.
By default, OAuth2PasswordBearer raise an HTTPException with status code 401. So, I can't check if an user is actually connected without return a 401 error to the client.
An example of what I want to do:
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/users/token")
def get_current_user(token: str = Depends(oauth2_scheme)):
try:
settings = get_settings()
payload = jwt.decode(token, settings.secret_key,
algorithms=[settings.algorithm_hash])
email = payload.get("email")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except jwt.JWTError:
raise credentials_exception
user = UserNode.get_node_with_email(token_data.email)
if user is None:
raise credentials_exception
return user
@app.get('/')
def is_connected(user = Depends(get_current_user)
# here, I can't do anything if the user is not connected,
# because an exception is raised in the OAuth2PasswordBearer __call__ method ...
return
I see OAuth2PasswordBearer class have an "auto_error" attribute, which controls if the function returns None or raises an error:
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
So i think about a workaround:
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/users/token", auto_error=False)
def get_current_user(token: str = Depends(oauth2_scheme)):
if not token:
return None
# [ ... same token decoding logic than before ... ]
return user
@app.get('/')
def is_connected(user = Depends(get_current_user)
return user
It works, but I wonder what other ways there are to do this, is there a more "official" method?
Upvotes: 2
Views: 5039
Reputation: 119
This is a good question and as far as I know, there isn't an "official" answer that is universally agreed upon.
The approach I've seen most often in the FastAPI applications that I've reviewed involves creating multiple dependencies for each use case.
While the code works similarly to the example you've provided, the key difference is that it attempts to parse the JWT every time - and doesn't only raise the credentials exception when it does not exist. Make sure the dependency accounts for malformed JWTs, invalid JWTs, etc.
Here's an example adapted to the general structure you've specified:
# ...other code
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="api/users/token",
auto_error=False
)
auth_service = AuthService() # service responsible for JWT management
async def get_user_from_token(
token: str = Depends(oauth2_scheme),
user_node: UserNode = Depends(get_user_node),
) -> Optional[User]:
try:
email = auth_service.get_email_from_token(
token=token,
secret_key=config.SECRET_KEY
)
user = await user_node.get_node_with_email(email)
return user
except Exception:
# exceptions may include no token, expired JWT, malformed JWT,
# or database errors - either way we ignore them and return None
return None
def get_current_user_required(
user: Optional[User] = Depends(get_user_from_token)
) -> Optional[User]:
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="An authenticated user is required for that action.",
headers={"WWW-Authenticate": "Bearer"},
)
return user
def get_current_user_optional(
user: Optional[User] = Depends(get_user_from_token)
) -> Optional[User]:
return user
Upvotes: 3