58 lines
1.9 KiB
Python
58 lines
1.9 KiB
Python
import structlog as structlog
|
|
from authlib.integrations.base_client import OAuthError
|
|
from django.conf import settings
|
|
from django.contrib.auth import login as dj_login
|
|
from django.shortcuts import redirect
|
|
from sentry_sdk import capture_exception
|
|
|
|
from vbv_lernwelt.importer.services import create_or_update_user
|
|
from vbv_lernwelt.sso.client import oauth
|
|
from vbv_lernwelt.sso.jwt import decode_jwt
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
OAUTH_FAIL_REDIRECT = "login-error"
|
|
|
|
|
|
def login(request):
|
|
oauth_client = oauth.create_client(settings.OAUTH["client_name"])
|
|
redirect_uri = settings.OAUTH["local_redirect_uri"]
|
|
language = request.GET.get("lang", "de")
|
|
return oauth_client.authorize_redirect(request, redirect_uri, lang=language)
|
|
|
|
|
|
def authorize(request):
|
|
try:
|
|
logger.debug(request, label="sso")
|
|
token = getattr(oauth, settings.OAUTH["client_name"]).authorize_access_token(
|
|
request
|
|
)
|
|
decoded_token = decode_jwt(token["id_token"])
|
|
# logger.debug(label="sso", decoded_token=decoded_token)
|
|
except OAuthError as e:
|
|
logger.error(e, exc_info=True, label="sso")
|
|
if not settings.DEBUG:
|
|
capture_exception(e)
|
|
return redirect(f"/{OAUTH_FAIL_REDIRECT}?state=someerror") # to be defined
|
|
|
|
user_data = _user_data_from_token_data(decoded_token)
|
|
user = create_or_update_user(
|
|
email=user_data.get("email").lower(),
|
|
sso_id=user_data.get("sso_id"),
|
|
first_name=user_data.get("first_name", ""),
|
|
last_name=user_data.get("last_name", ""),
|
|
)
|
|
|
|
dj_login(request, user)
|
|
return redirect(f"/")
|
|
|
|
|
|
def _user_data_from_token_data(token: dict) -> dict:
|
|
first_email = token.get("emails", [""])[0]
|
|
return {
|
|
"first_name": token.get("given_name", ""),
|
|
"last_name": token.get("family_name", ""),
|
|
"email": first_email,
|
|
"sso_id": token.get("oid"),
|
|
}
|