125 lines
3.7 KiB
Python
125 lines
3.7 KiB
Python
import base64
|
|
import json
|
|
|
|
import structlog as structlog
|
|
from authlib.integrations.base_client import OAuthError
|
|
from django.conf import settings
|
|
from django.contrib.auth import login as dj_login
|
|
from django.shortcuts import redirect
|
|
from sentry_sdk import capture_exception
|
|
|
|
from vbv_lernwelt.core.models import User
|
|
from vbv_lernwelt.course.models import CourseSession
|
|
from vbv_lernwelt.course_session.utils import has_course_session_user_vv
|
|
from vbv_lernwelt.importer.services import create_or_update_user
|
|
from vbv_lernwelt.sso.client import oauth
|
|
from vbv_lernwelt.sso.jwt import decode_jwt
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
def signup(request):
|
|
course_param = request.GET.get("course")
|
|
next_param = request.GET.get("next")
|
|
|
|
state_json = json.dumps({"course": course_param, "next": next_param})
|
|
state_encoded = base64.urlsafe_b64encode(state_json.encode()).decode()
|
|
|
|
redirect_uri = settings.OAUTH_SIGNUP_REDIRECT_URI
|
|
|
|
logger.debug(
|
|
f"SSO Signup (course={course_param}, next={next_param})",
|
|
sso_signup_redirect_uri=redirect_uri,
|
|
)
|
|
|
|
return oauth.signup.authorize_redirect(
|
|
request, redirect_uri, state=state_encoded, lang=request.GET.get("lang", "de")
|
|
)
|
|
|
|
|
|
def signin(request):
|
|
"""
|
|
Called directly from the frontend AND as a redirect from signup!
|
|
"""
|
|
|
|
# redirect from signup
|
|
if "state" in request.GET:
|
|
state_decoded = json.loads(
|
|
base64.urlsafe_b64decode(request.GET.get("state").encode()).decode()
|
|
)
|
|
state_course_param = state_decoded.get("course")
|
|
state_next_param = state_decoded.get("next")
|
|
else:
|
|
state_course_param = None
|
|
state_next_param = None
|
|
|
|
course_param = request.GET.get("course", state_course_param)
|
|
next_param = request.GET.get("next", state_next_param)
|
|
|
|
state_json = json.dumps({"course": course_param, "next": next_param})
|
|
state_encoded = base64.urlsafe_b64encode(state_json.encode()).decode()
|
|
|
|
redirect_uri = settings.OAUTH_SIGNIN_REDIRECT_URI
|
|
|
|
logger.info(
|
|
f"SSO Login (course={course_param}, next={next_param})",
|
|
sso_login_redirect_uri=redirect_uri,
|
|
)
|
|
|
|
return oauth.signin.authorize_redirect(
|
|
request,
|
|
redirect_uri,
|
|
state=state_encoded,
|
|
ui_locales=request.GET.get("lang", "de"),
|
|
)
|
|
|
|
|
|
def get_redirect_uri(user: User, course: str | None, next_url: str | None):
|
|
if course and course.startswith("vv") and not has_course_session_user_vv(user):
|
|
return redirect(f"/onboarding/{course}/account/create")
|
|
elif (
|
|
course == "uk"
|
|
and not CourseSession.objects.filter(coursesessionuser__user=user).exists()
|
|
):
|
|
return redirect("/onboarding/uk/account/create")
|
|
elif next_url:
|
|
return redirect(next_url)
|
|
|
|
return redirect("/")
|
|
|
|
|
|
def authorize_signin(request):
|
|
try:
|
|
jwt_token = oauth.signin.authorize_access_token(request)
|
|
except OAuthError as e:
|
|
logger.error(e, exc_info=True, label="sso")
|
|
if not settings.DEBUG:
|
|
capture_exception(e)
|
|
return redirect("/")
|
|
|
|
id_token = decode_jwt(jwt_token["id_token"])
|
|
|
|
state = json.loads(
|
|
base64.urlsafe_b64decode(request.GET.get("state").encode()).decode()
|
|
)
|
|
|
|
course = state.get("course")
|
|
next_url = state.get("next")
|
|
|
|
logger.debug(
|
|
f"SSO Authorize (course={course}, next={next_url}",
|
|
sso_authorize_id_token=id_token,
|
|
)
|
|
|
|
user = create_or_update_user(
|
|
email=id_token.get("email", ""),
|
|
sso_id=id_token.get("oid"),
|
|
first_name=id_token.get("given_name", ""),
|
|
last_name=id_token.get("family_name", ""),
|
|
intermediate_sso_id=id_token.get("sub"),
|
|
)
|
|
|
|
dj_login(request, user)
|
|
|
|
return get_redirect_uri(user=user, course=course, next_url=next_url)
|