import base64 import json import structlog as structlog from authlib.integrations.base_client import OAuthError from django.conf import settings from django.contrib.auth import login as dj_login from django.contrib.auth import logout as dj_logout from django.shortcuts import redirect from sentry_sdk import capture_exception from vbv_lernwelt.core.models import User from vbv_lernwelt.course.models import CourseSession from vbv_lernwelt.course_session.utils import has_course_session_user_vv from vbv_lernwelt.importer.services import create_or_update_user from vbv_lernwelt.sso.client import oauth from vbv_lernwelt.sso.jwt import decode_jwt logger = structlog.get_logger(__name__) def signup(request): course_param = request.GET.get("course") next_param = request.GET.get("next") state_json = json.dumps({"course": course_param, "next": next_param}) state_encoded = base64.urlsafe_b64encode(state_json.encode()).decode() redirect_uri = settings.OAUTH_SIGNUP_REDIRECT_URI logger.debug( f"SSO Signup (course={course_param}, next={next_param})", sso_signup_redirect_uri=redirect_uri, ) return oauth.signup.authorize_redirect( request, redirect_uri, state=state_encoded, lang=request.GET.get("lang", "de") ) def signin(request): """ Called directly from the frontend AND as a redirect from signup! """ # redirect from signup if "state" in request.GET: state_decoded = json.loads( base64.urlsafe_b64decode(request.GET.get("state").encode()).decode() ) state_course_param = state_decoded.get("course") state_next_param = state_decoded.get("next") else: state_course_param = None state_next_param = None course_param = request.GET.get("course", state_course_param) next_param = request.GET.get("next", state_next_param) state_json = json.dumps({"course": course_param, "next": next_param}) state_encoded = base64.urlsafe_b64encode(state_json.encode()).decode() redirect_uri = settings.OAUTH_SIGNIN_REDIRECT_URI logger.info( f"SSO Login (course={course_param}, next={next_param})", sso_login_redirect_uri=redirect_uri, ) return oauth.signin.authorize_redirect( request, redirect_uri, state=state_encoded, ui_locales=request.GET.get("lang", "de"), ) def get_redirect_uri(user: User, course: str | None, next_url: str | None): if course and course.startswith("vv") and not has_course_session_user_vv(user): return redirect(f"/onboarding/{course}/account/create") elif ( course == "uk" and not CourseSession.objects.filter(coursesessionuser__user=user).exists() ): return redirect("/onboarding/uk/account/create") elif next_url: return redirect(next_url) return redirect("/") def authorize_signin(request): try: jwt_token = oauth.signin.authorize_access_token(request) except OAuthError as e: logger.error(e, exc_info=True, label="sso") if not settings.DEBUG: capture_exception(e) return redirect("/") id_token_decoded = decode_jwt(jwt_token["id_token"]) state = json.loads( base64.urlsafe_b64decode(request.GET.get("state").encode()).decode() ) course = state.get("course") next_url = state.get("next") logger.debug( f"SSO Authorize (course={course}, next={next_url}", sso_authorize_id_token=id_token_decoded, ) user = create_or_update_user( email=id_token_decoded.get("email", ""), sso_id=id_token_decoded.get("oid"), first_name=id_token_decoded.get("given_name", ""), last_name=id_token_decoded.get("family_name", ""), intermediate_sso_id=id_token_decoded.get("sub"), id_token=jwt_token["id_token"], ) dj_login(request, user) return get_redirect_uri(user=user, course=course, next_url=next_url) def logout(request): user_data = ( request.user.additional_json_data if request.user.is_authenticated else {} ) dj_logout(request) redirect_uri = getattr(settings, "OAUTH_LOGOUT_REDIRECT_URI", "/") # Handle scenarios when SSO-related data is missing, assume local login if not user_data.get("intermediate_sso_id"): logger.debug("SSO Logout", extra={"mode": "intermediate_sso_id_not_set"}) return redirect("/") # Temporary solution if id_token is not set in the user data, can be removed after rollout + 2 weeks id_token = user_data.get("id_token", "") if not id_token: logger.debug("SSO Logout", extra={"mode": "id_token_not_set"}) url_param_symbol = "&" if "?" in redirect_uri else "?" return redirect(f"{redirect_uri}{url_param_symbol}client_id=iterativ") # Handle scenarios when SSO-related data is present or redirect_uri is not set if not redirect_uri: logger.debug("SSO Logout", extra={"mode": "redirect_uri_not_set"}) return redirect("/") return redirect(f"{redirect_uri}&id_token_hint={id_token}")