72 lines
2.4 KiB
Python
72 lines
2.4 KiB
Python
import structlog as structlog
|
|
from authlib.integrations.base_client import OAuthError
|
|
from django.conf import settings
|
|
from django.contrib.auth import login as dj_login
|
|
from django.shortcuts import redirect
|
|
from sentry_sdk import capture_exception
|
|
|
|
from vbv_lernwelt.course.models import CourseSession
|
|
from vbv_lernwelt.course_session.utils import has_course_session_user_vv
|
|
from vbv_lernwelt.importer.services import create_or_update_user
|
|
from vbv_lernwelt.sso.client import oauth
|
|
from vbv_lernwelt.sso.jwt import decode_jwt
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
OAUTH_FAIL_REDIRECT = "login-error"
|
|
|
|
|
|
def signup(request):
|
|
course = request.GET.get("course")
|
|
redirect_uri = settings.OAUTH_SIGNUP_REDIRECT_URI
|
|
logger.debug(f"SSO Signup (course={course})", sso_signup_redirect_uri=redirect_uri)
|
|
return oauth.signup.authorize_redirect(request, redirect_uri, state=course)
|
|
|
|
|
|
def signin(request):
|
|
# course query OR state when coming from signup (oauth)
|
|
course = request.GET.get("course", request.GET.get("state"))
|
|
redirect_uri = settings.OAUTH_SIGNIN_REDIRECT_URI
|
|
logger.info(f"SSO Login (course={course})", sso_login_redirect_uri=redirect_uri)
|
|
return oauth.signin.authorize_redirect(
|
|
request, redirect_uri, state=course, lang=request.GET.get("lang", "de")
|
|
)
|
|
|
|
|
|
def authorize_signin(request):
|
|
try:
|
|
jwt_token = oauth.signin.authorize_access_token(request)
|
|
except OAuthError as e:
|
|
logger.error(e, exc_info=True, label="sso")
|
|
if not settings.DEBUG:
|
|
capture_exception(e)
|
|
return redirect(f"/{OAUTH_FAIL_REDIRECT}?state=oautherror")
|
|
|
|
id_token = decode_jwt(jwt_token["id_token"])
|
|
course = request.GET.get("state")
|
|
|
|
logger.debug(
|
|
f"SSO Authorize (course={course})",
|
|
sso_authorize_id_token=id_token,
|
|
)
|
|
|
|
user = create_or_update_user(
|
|
email=id_token.get("email", ""),
|
|
sso_id=id_token.get("oid"),
|
|
first_name=id_token.get("given_name", ""),
|
|
last_name=id_token.get("family_name", ""),
|
|
)
|
|
|
|
dj_login(request, user)
|
|
|
|
# figure out where to redirect to (onboarding or home)
|
|
if course == "vv" and not has_course_session_user_vv(user):
|
|
return redirect("/onboarding/vv/account/create")
|
|
elif (
|
|
course == "uk"
|
|
and not CourseSession.objects.filter(coursesessionuser__user=user).exists()
|
|
):
|
|
return redirect("/onboarding/uk/account/create")
|
|
else:
|
|
return redirect("/")
|