import base64 import json import structlog as structlog from authlib.integrations.base_client import OAuthError from django.conf import settings from django.contrib.auth import login as dj_login from django.shortcuts import redirect from sentry_sdk import capture_exception from vbv_lernwelt.core.models import User from vbv_lernwelt.course.models import CourseSession from vbv_lernwelt.course_session.utils import has_course_session_user_vv from vbv_lernwelt.importer.services import create_or_update_user from vbv_lernwelt.sso.client import oauth from vbv_lernwelt.sso.jwt import decode_jwt logger = structlog.get_logger(__name__) def signup(request): course = request.GET.get("course") redirect_uri = settings.OAUTH_SIGNUP_REDIRECT_URI logger.debug(f"SSO Signup (course={course})", sso_signup_redirect_uri=redirect_uri) return oauth.signup.authorize_redirect( request, redirect_uri, state=course, lang=request.GET.get("lang", "de") ) def signin(request): # course query OR state when coming from signup (oauth) course_param = request.GET.get("course", request.GET.get("state")) next_param = request.GET.get("next") state_json = json.dumps({"course": course_param, "next": next_param}) state_encoded = base64.urlsafe_b64encode(state_json.encode()).decode() redirect_uri = settings.OAUTH_SIGNIN_REDIRECT_URI logger.info( f"SSO Login (course={course_param}, next={next_param})", sso_login_redirect_uri=redirect_uri, ) return oauth.signin.authorize_redirect( request, redirect_uri, state=state_encoded, ui_locales=request.GET.get("lang", "de"), ) def get_redirect_uri(user: User, course: str | None, next_url: str | None): if course and course.startswith("vv") and not has_course_session_user_vv(user): return redirect(f"/onboarding/{course}/account/create") elif ( course == "uk" and not CourseSession.objects.filter(coursesessionuser__user=user).exists() ): return redirect("/onboarding/uk/account/create") elif next_url: return redirect(next_url) return redirect("/") def authorize_signin(request): try: jwt_token = oauth.signin.authorize_access_token(request) except OAuthError as e: logger.error(e, exc_info=True, label="sso") if not settings.DEBUG: capture_exception(e) return redirect("/") id_token = decode_jwt(jwt_token["id_token"]) state = json.loads( base64.urlsafe_b64decode(request.GET.get("state").encode()).decode() ) course = state.get("course") next_url = state.get("next") logger.debug( f"SSO Authorize (course={course}, next={next_url}", sso_authorize_id_token=id_token, ) user = create_or_update_user( email=id_token.get("email", ""), sso_id=id_token.get("oid"), first_name=id_token.get("given_name", ""), last_name=id_token.get("family_name", ""), ) dj_login(request, user) return get_redirect_uri(user=user, course=course, next_url=next_url)