vbv/server/vbv_lernwelt/sso/views.py

58 lines
1.9 KiB
Python

import structlog as structlog
from authlib.integrations.base_client import OAuthError
from django.conf import settings
from django.contrib.auth import login as dj_login
from django.shortcuts import redirect
from sentry_sdk import capture_exception
from vbv_lernwelt.importer.services import create_or_update_user
from vbv_lernwelt.sso.client import oauth
from vbv_lernwelt.sso.jwt import decode_jwt
logger = structlog.get_logger(__name__)
OAUTH_FAIL_REDIRECT = "login-error"
def login(request):
oauth_client = oauth.create_client(settings.OAUTH["client_name"])
redirect_uri = settings.OAUTH["local_redirect_uri"]
language = request.GET.get("lang", "de")
return oauth_client.authorize_redirect(request, redirect_uri, lang=language)
def authorize(request):
try:
logger.debug(request, label="sso")
token = getattr(oauth, settings.OAUTH["client_name"]).authorize_access_token(
request
)
decoded_token = decode_jwt(token["id_token"])
# logger.debug(label="sso", decoded_token=decoded_token)
except OAuthError as e:
logger.error(e, exc_info=True, label="sso")
if not settings.DEBUG:
capture_exception(e)
return redirect(f"/{OAUTH_FAIL_REDIRECT}?state=someerror") # to be defined
user_data = _user_data_from_token_data(decoded_token)
user = create_or_update_user(
email=user_data.get("email").lower(),
sso_id=user_data.get("sso_id"),
first_name=user_data.get("first_name", ""),
last_name=user_data.get("last_name", ""),
)
dj_login(request, user)
return redirect(f"/")
def _user_data_from_token_data(token: dict) -> dict:
first_email = token.get("emails", [""])[0]
return {
"first_name": token.get("given_name", ""),
"last_name": token.get("family_name", ""),
"email": first_email,
"sso_id": token.get("oid"),
}