vbv/server/vbv_lernwelt/sso/views.py

124 lines
3.7 KiB
Python

import base64
import json
import structlog as structlog
from authlib.integrations.base_client import OAuthError
from django.conf import settings
from django.contrib.auth import login as dj_login
from django.shortcuts import redirect
from sentry_sdk import capture_exception
from vbv_lernwelt.core.models import User
from vbv_lernwelt.course.models import CourseSession
from vbv_lernwelt.course_session.utils import has_course_session_user_vv
from vbv_lernwelt.importer.services import create_or_update_user
from vbv_lernwelt.sso.client import oauth
from vbv_lernwelt.sso.jwt import decode_jwt
logger = structlog.get_logger(__name__)
def signup(request):
course_param = request.GET.get("course")
next_param = request.GET.get("next")
state_json = json.dumps({"course": course_param, "next": next_param})
state_encoded = base64.urlsafe_b64encode(state_json.encode()).decode()
redirect_uri = settings.OAUTH_SIGNUP_REDIRECT_URI
logger.debug(
f"SSO Signup (course={course_param}, next={next_param})",
sso_signup_redirect_uri=redirect_uri,
)
return oauth.signup.authorize_redirect(
request, redirect_uri, state=state_encoded, lang=request.GET.get("lang", "de")
)
def signin(request):
"""
Called directly from the frontend AND as a redirect from signup!
"""
# redirect from signup
if "state" in request.GET:
state_decoded = json.loads(
base64.urlsafe_b64decode(request.GET.get("state").encode()).decode()
)
state_course_param = state_decoded.get("course")
state_next_param = state_decoded.get("next")
else:
state_course_param = None
state_next_param = None
course_param = request.GET.get("course", state_course_param)
next_param = request.GET.get("next", state_next_param)
state_json = json.dumps({"course": course_param, "next": next_param})
state_encoded = base64.urlsafe_b64encode(state_json.encode()).decode()
redirect_uri = settings.OAUTH_SIGNIN_REDIRECT_URI
logger.info(
f"SSO Login (course={course_param}, next={next_param})",
sso_login_redirect_uri=redirect_uri,
)
return oauth.signin.authorize_redirect(
request,
redirect_uri,
state=state_encoded,
ui_locales=request.GET.get("lang", "de"),
)
def get_redirect_uri(user: User, course: str | None, next_url: str | None):
if course and course.startswith("vv") and not has_course_session_user_vv(user):
return redirect(f"/onboarding/{course}/account/create")
elif (
course == "uk"
and not CourseSession.objects.filter(coursesessionuser__user=user).exists()
):
return redirect("/onboarding/uk/account/create")
elif next_url:
return redirect(next_url)
return redirect("/")
def authorize_signin(request):
try:
jwt_token = oauth.signin.authorize_access_token(request)
except OAuthError as e:
logger.error(e, exc_info=True, label="sso")
if not settings.DEBUG:
capture_exception(e)
return redirect("/")
id_token = decode_jwt(jwt_token["id_token"])
state = json.loads(
base64.urlsafe_b64decode(request.GET.get("state").encode()).decode()
)
course = state.get("course")
next_url = state.get("next")
logger.debug(
f"SSO Authorize (course={course}, next={next_url}",
sso_authorize_id_token=id_token,
)
user = create_or_update_user(
email=id_token.get("email", ""),
sso_id=id_token.get("oid"),
first_name=id_token.get("given_name", ""),
last_name=id_token.get("family_name", ""),
)
dj_login(request, user)
return get_redirect_uri(user=user, course=course, next_url=next_url)