vbv/server/vbv_lernwelt/sso/tests/test_sso_flow.py

357 lines
12 KiB
Python

import base64
import json
import uuid
from unittest.mock import ANY, MagicMock, patch
from authlib.integrations.base_client import OAuthError
from django.shortcuts import redirect
from django.test import TestCase, override_settings
from django.urls import reverse
from vbv_lernwelt.core.models import User
from vbv_lernwelt.course.consts import COURSE_VERSICHERUNGSVERMITTLERIN_ID
from vbv_lernwelt.course.creators.test_utils import (
add_course_session_user,
create_course,
create_course_session,
)
from vbv_lernwelt.course.models import CourseSession, CourseSessionUser
from vbv_lernwelt.importer.services import create_or_update_user
def decoded_token(email, oid=None, given_name="Bobby", family_name="Table"):
return {
"email": email,
"oid": oid or uuid.uuid4(),
"given_name": given_name,
"family_name": family_name,
}
class TestSignInAuthorizeSSO(TestCase):
def setUp(self):
CourseSession.objects.all().delete()
User.objects.all().delete()
@override_settings(OAUTH={"client_name": "mock"})
@patch("vbv_lernwelt.sso.views.oauth")
@patch("vbv_lernwelt.sso.views.decode_jwt")
def test_authorize_first_time_uk(self, mock_decode_jwt, mock_oauth):
# GIVEN
email = "user@example.com"
token = decoded_token(email)
mock_decode_jwt.return_value = token
mock_oauth.signin.authorize_access_token.return_value = {
"id_token": "<some-token-irrelevant-for-this-test>"
}
state = base64.urlsafe_b64encode(
json.dumps({"course": "uk", "next": "/shall-be-ignored"}).encode()
).decode()
# WHEN
response = self.client.get(reverse("sso:authorize"), {"state": state})
# THEN
self.assertEqual(302, response.status_code)
self.assertEqual("/onboarding/uk/account/create", response.url) # noqa
user = User.objects.get(email=email) # noqa
self.assertIsNotNone(user)
self.assertEqual(user.email, email)
self.assertEqual(user.first_name, token["given_name"])
self.assertEqual(user.last_name, token["family_name"])
self.assertEqual(user.sso_id, token["oid"])
@override_settings(OAUTH={"client_name": "mock"})
@patch("vbv_lernwelt.sso.views.oauth")
@patch("vbv_lernwelt.sso.views.decode_jwt")
def test_authorize_first_time_vv(self, mock_decode_jwt, mock_oauth):
# GIVEN
email = "user@example.com"
token = decoded_token(email)
mock_decode_jwt.return_value = token
mock_oauth.signin.authorize_access_token.return_value = {
"id_token": "<some-token-irrelevant-for-this-test>"
}
state = base64.urlsafe_b64encode(
json.dumps({"course": "vv-de", "next": "/shall-be-ignored"}).encode()
).decode()
# WHEN
response = self.client.get(reverse("sso:authorize"), {"state": state})
# THEN
self.assertEqual(302, response.status_code)
self.assertEqual("/onboarding/vv-de/account/create", response.url) # noqa
user = User.objects.get(email=email) # noqa
self.assertIsNotNone(user)
self.assertEqual(user.email, email)
self.assertEqual(user.first_name, token["given_name"])
self.assertEqual(user.last_name, token["family_name"])
self.assertEqual(user.sso_id, token["oid"])
@patch("vbv_lernwelt.sso.views.oauth")
def test_authorize_on_tampered_token(self, mock_oauth_service):
# GIVEN
mock_oauth_service.signin.authorize_access_token.side_effect = OAuthError()
# WHEN
response = self.client.get(reverse("sso:authorize"))
# THEN
self.assertEqual(302, response.status_code)
self.assertEqual("/", response.url) # noqa
@override_settings(OAUTH={"client_name": "mock"})
@patch("vbv_lernwelt.sso.views.oauth")
@patch("vbv_lernwelt.sso.views.decode_jwt")
def test_authorize_onboarded_uk(self, mock_decode_jwt, mock_oauth):
# GIVEN
email = "some@email.com"
token = decoded_token(email)
mock_decode_jwt.return_value = token
mock_oauth.signin.authorize_access_token.return_value = {
"id_token": "<some-token-irrelevant-for-this-test>"
}
# create a user that is already onboarded for UK
user = create_or_update_user(
email=email,
sso_id=str(token["oid"]),
first_name=token["given_name"],
last_name=token["family_name"],
)
course, _ = create_course("uk")
course_session = create_course_session(course, "UK", "2023")
add_course_session_user(course_session, user, CourseSessionUser.Role.MEMBER)
self.assertIsNotNone(User.objects.get(email=email))
# WHEN
state = base64.urlsafe_b64encode(json.dumps({"course": "uk"}).encode()).decode()
response = self.client.get(reverse("sso:authorize"), {"state": state})
# THEN
self.assertEqual(302, response.status_code)
self.assertEqual("/", response.url) # noqa
@override_settings(OAUTH={"client_name": "mock"})
@patch("vbv_lernwelt.sso.views.oauth")
@patch("vbv_lernwelt.sso.views.decode_jwt")
def test_authorize_onboarded_vv(self, mock_decode_jwt, mock_oauth):
# GIVEN
email = "some@email.com"
token = decoded_token(email)
mock_decode_jwt.return_value = token
mock_oauth.signin.authorize_access_token.return_value = {
"id_token": "<some-token-irrelevant-for-this-test>"
}
# create a user that is already onboarded for UK
user = create_or_update_user(
email=email,
sso_id=str(token["oid"]),
first_name=token["given_name"],
last_name=token["family_name"],
)
course, _ = create_course(_id=COURSE_VERSICHERUNGSVERMITTLERIN_ID, title="VV")
course_session = create_course_session(course, "VV", "VV")
add_course_session_user(course_session, user, CourseSessionUser.Role.MEMBER)
self.assertIsNotNone(User.objects.get(email=email))
# WHEN
state = base64.urlsafe_b64encode(json.dumps({"course": "vv"}).encode()).decode()
response = self.client.get(reverse("sso:authorize"), {"state": state})
# THEN
self.assertEqual(302, response.status_code)
self.assertEqual("/", response.url) # noqa
@override_settings(OAUTH={"client_name": "mock"})
@patch("vbv_lernwelt.sso.views.oauth")
@patch("vbv_lernwelt.sso.views.decode_jwt")
def test_authorize_next_url(self, mock_decode_jwt, mock_oauth):
# GIVEN
next_url = "/some/next/url"
mock_decode_jwt.return_value = decoded_token("whatever@example.com")
mock_oauth.signin.authorize_access_token.return_value = {
"id_token": "<some-token-irrelevant-for-this-test>"
}
# WHEN
state = base64.urlsafe_b64encode(
json.dumps({"next": next_url}).encode()
).decode()
response = self.client.get(reverse("sso:authorize"), {"state": state})
# THEN
self.assertEqual(302, response.status_code)
self.assertEqual(next_url, response.url) # noqa
class TestSignIn(TestCase):
@override_settings(OAUTH_SIGNIN_REDIRECT_URI="/sso/callback")
@patch("vbv_lernwelt.sso.views.oauth")
def test_signin_with_course_param(self, mock_oauth):
# GIVEN
course_param = "vv-de"
expected_state = {"course": course_param, "next": None}
expected_state_encoded = base64.urlsafe_b64encode(
json.dumps(expected_state).encode()
).decode()
mock_oauth.signin.authorize_redirect = MagicMock()
mock_oauth.signin.authorize_redirect.return_value = redirect(
"/just/here/to/return/a/redirect/object"
)
# WHEN
self.client.get(
reverse("sso:login"),
{"course": course_param},
)
# THEN
mock_oauth.signin.authorize_redirect.assert_called_once_with(
ANY,
"/sso/callback",
state=expected_state_encoded,
ui_locales="de",
)
@override_settings(OAUTH_SIGNIN_REDIRECT_URI="/sso/callback")
@patch("vbv_lernwelt.sso.views.oauth")
def test_signin_with_state_param(self, mock_oauth):
# GIVEN
course = "vv-de"
state_param = base64.urlsafe_b64encode(
json.dumps({"course": course, "next": None}).encode()
).decode()
expected_state = {"course": course, "next": None}
expected_state_encoded = base64.urlsafe_b64encode(
json.dumps(expected_state).encode()
).decode()
mock_oauth.signin.authorize_redirect = MagicMock()
mock_oauth.signin.authorize_redirect.return_value = redirect(
"/just/here/to/return/a/redirect/object"
)
# WHEN
self.client.get(
reverse("sso:login"),
{"state": state_param},
)
# THEN
mock_oauth.signin.authorize_redirect.assert_called_once_with(
ANY,
"/sso/callback",
state=expected_state_encoded,
ui_locales="de",
)
@override_settings(OAUTH_SIGNIN_REDIRECT_URI="/sso/callback")
@patch("vbv_lernwelt.sso.views.oauth")
def test_signin_next_url(self, mock_oauth):
# GIVEN
next_url = "/some/next/url"
expected_state = {"course": None, "next": next_url}
expected_state_encoded = base64.urlsafe_b64encode(
json.dumps(expected_state).encode()
).decode()
mock_oauth.signin.authorize_redirect = MagicMock()
mock_oauth.signin.authorize_redirect.return_value = redirect(
"/just/here/to/return/a/redirect/object"
)
# WHEN
self.client.get(
reverse("sso:login"),
{"next": next_url},
)
# THEN
mock_oauth.signin.authorize_redirect.assert_called_once_with(
ANY,
"/sso/callback",
state=expected_state_encoded,
ui_locales=ANY,
)
@override_settings(OAUTH_SIGNIN_REDIRECT_URI="/sso/callback")
@patch("vbv_lernwelt.sso.views.oauth")
def test_signin_language(self, mock_oauth):
# GIVEN
language = "fr"
mock_oauth.signin.authorize_redirect = MagicMock()
mock_oauth.signin.authorize_redirect.return_value = redirect(
"/just/here/to/return/a/redirect/object"
)
# WHEN
self.client.get(
reverse("sso:login"),
{"lang": language},
)
# THEN
mock_oauth.signin.authorize_redirect.assert_called_once_with(
ANY,
"/sso/callback",
state=ANY,
ui_locales=language,
)
class TestSignUp(TestCase):
@override_settings(OAUTH_SIGNUP_REDIRECT_URI="/sso/login")
@patch("vbv_lernwelt.sso.views.oauth")
def test_signup_with_course_param(self, mock_oauth):
# GIVEN
course_param = "vv-de"
next_param = "/some-next-url"
language = "fr"
expected_state = {"course": course_param, "next": next_param}
expected_state_encoded = base64.urlsafe_b64encode(
json.dumps(expected_state).encode()
).decode()
mock_oauth.signup.authorize_redirect = MagicMock()
mock_oauth.signup.authorize_redirect.return_value = redirect(
"/just/here/to/return/a/redirect/object"
)
# WHEN
self.client.get(
reverse("sso:signup"),
{"course": course_param, "next": next_param, "lang": language},
)
# THEN
mock_oauth.signup.authorize_redirect.assert_called_once_with(
ANY,
"/sso/login",
state=expected_state_encoded,
lang=language,
)