import base64 import json import uuid from unittest.mock import ANY, MagicMock, patch from authlib.integrations.base_client import OAuthError from django.shortcuts import redirect from django.test import override_settings, TestCase from django.urls import reverse from vbv_lernwelt.core.models import User from vbv_lernwelt.course.consts import COURSE_VERSICHERUNGSVERMITTLERIN_ID from vbv_lernwelt.course.creators.test_utils import ( add_course_session_user, create_course, create_course_session, ) from vbv_lernwelt.course.models import CourseSession, CourseSessionUser from vbv_lernwelt.importer.services import create_or_update_user def decoded_token(email, oid=None, given_name="Bobby", family_name="Table"): return { "email": email, "oid": oid or uuid.uuid4(), "given_name": given_name, "family_name": family_name, } class TestSignInAuthorizeSSO(TestCase): def setUp(self): CourseSession.objects.all().delete() User.objects.all().delete() @override_settings(OAUTH={"client_name": "mock"}) @patch("vbv_lernwelt.sso.views.oauth") @patch("vbv_lernwelt.sso.views.decode_jwt") def test_authorize_first_time_uk(self, mock_decode_jwt, mock_oauth): # GIVEN email = "user@example.com" token = decoded_token(email) mock_decode_jwt.return_value = token mock_oauth.signin.authorize_access_token.return_value = { "id_token": "" } state = base64.urlsafe_b64encode( json.dumps({"course": "uk", "next": "/shall-be-ignored"}).encode() ).decode() # WHEN response = self.client.get(reverse("sso:authorize"), {"state": state}) # THEN self.assertEqual(302, response.status_code) self.assertEqual("/onboarding/uk/account/create", response.url) # noqa user = User.objects.get(email=email) # noqa self.assertIsNotNone(user) self.assertEqual(user.email, email) self.assertEqual(user.first_name, token["given_name"]) self.assertEqual(user.last_name, token["family_name"]) self.assertEqual(user.sso_id, token["oid"]) @override_settings(OAUTH={"client_name": "mock"}) @patch("vbv_lernwelt.sso.views.oauth") @patch("vbv_lernwelt.sso.views.decode_jwt") def test_authorize_first_time_vv(self, mock_decode_jwt, mock_oauth): # GIVEN email = "user@example.com" token = decoded_token(email) mock_decode_jwt.return_value = token mock_oauth.signin.authorize_access_token.return_value = { "id_token": "" } state = base64.urlsafe_b64encode( json.dumps({"course": "vv-de", "next": "/shall-be-ignored"}).encode() ).decode() # WHEN response = self.client.get(reverse("sso:authorize"), {"state": state}) # THEN self.assertEqual(302, response.status_code) self.assertEqual("/onboarding/vv-de/account/create", response.url) # noqa user = User.objects.get(email=email) # noqa self.assertIsNotNone(user) self.assertEqual(user.email, email) self.assertEqual(user.first_name, token["given_name"]) self.assertEqual(user.last_name, token["family_name"]) self.assertEqual(user.sso_id, token["oid"]) @patch("vbv_lernwelt.sso.views.oauth") def test_authorize_on_tampered_token(self, mock_oauth_service): # GIVEN mock_oauth_service.signin.authorize_access_token.side_effect = OAuthError() # WHEN response = self.client.get(reverse("sso:authorize")) # THEN self.assertEqual(302, response.status_code) self.assertEqual("/", response.url) # noqa @override_settings(OAUTH={"client_name": "mock"}) @patch("vbv_lernwelt.sso.views.oauth") @patch("vbv_lernwelt.sso.views.decode_jwt") def test_authorize_onboarded_uk(self, mock_decode_jwt, mock_oauth): # GIVEN email = "some@email.com" token = decoded_token(email) mock_decode_jwt.return_value = token mock_oauth.signin.authorize_access_token.return_value = { "id_token": "" } # create a user that is already onboarded for UK user = create_or_update_user( email=email, sso_id=str(token["oid"]), first_name=token["given_name"], last_name=token["family_name"], ) course, _ = create_course("uk") course_session = create_course_session(course, "UK", "2023") add_course_session_user(course_session, user, CourseSessionUser.Role.MEMBER) self.assertIsNotNone(User.objects.get(email=email)) # WHEN state = base64.urlsafe_b64encode(json.dumps({"course": "uk"}).encode()).decode() response = self.client.get(reverse("sso:authorize"), {"state": state}) # THEN self.assertEqual(302, response.status_code) self.assertEqual("/", response.url) # noqa @override_settings(OAUTH={"client_name": "mock"}) @patch("vbv_lernwelt.sso.views.oauth") @patch("vbv_lernwelt.sso.views.decode_jwt") def test_authorize_onboarded_vv(self, mock_decode_jwt, mock_oauth): # GIVEN email = "some@email.com" token = decoded_token(email) mock_decode_jwt.return_value = token mock_oauth.signin.authorize_access_token.return_value = { "id_token": "" } # create a user that is already onboarded for UK user = create_or_update_user( email=email, sso_id=str(token["oid"]), first_name=token["given_name"], last_name=token["family_name"], ) course, _ = create_course(_id=COURSE_VERSICHERUNGSVERMITTLERIN_ID, title="VV") course_session = create_course_session(course, "VV", "VV") add_course_session_user(course_session, user, CourseSessionUser.Role.MEMBER) self.assertIsNotNone(User.objects.get(email=email)) # WHEN state = base64.urlsafe_b64encode(json.dumps({"course": "vv"}).encode()).decode() response = self.client.get(reverse("sso:authorize"), {"state": state}) # THEN self.assertEqual(302, response.status_code) self.assertEqual("/", response.url) # noqa @override_settings(OAUTH={"client_name": "mock"}) @patch("vbv_lernwelt.sso.views.oauth") @patch("vbv_lernwelt.sso.views.decode_jwt") def test_authorize_next_url(self, mock_decode_jwt, mock_oauth): # GIVEN next_url = "/some/next/url" mock_decode_jwt.return_value = decoded_token("whatever@example.com") mock_oauth.signin.authorize_access_token.return_value = { "id_token": "" } # WHEN state = base64.urlsafe_b64encode( json.dumps({"next": next_url}).encode() ).decode() response = self.client.get(reverse("sso:authorize"), {"state": state}) # THEN self.assertEqual(302, response.status_code) self.assertEqual(next_url, response.url) # noqa class TestSignIn(TestCase): @override_settings(OAUTH_SIGNIN_REDIRECT_URI="/sso/callback") @patch("vbv_lernwelt.sso.views.oauth") def test_signin_with_course_param(self, mock_oauth): # GIVEN course_param = "vv-de" expected_state = {"course": course_param, "next": None} expected_state_encoded = base64.urlsafe_b64encode( json.dumps(expected_state).encode() ).decode() mock_oauth.signin.authorize_redirect = MagicMock() mock_oauth.signin.authorize_redirect.return_value = redirect( "/just/here/to/return/a/redirect/object" ) # WHEN self.client.get( reverse("sso:login"), {"course": course_param}, ) # THEN mock_oauth.signin.authorize_redirect.assert_called_once_with( ANY, "/sso/callback", state=expected_state_encoded, ui_locales="de", ) @override_settings(OAUTH_SIGNIN_REDIRECT_URI="/sso/callback") @patch("vbv_lernwelt.sso.views.oauth") def test_signin_with_state_param(self, mock_oauth): # GIVEN course = "vv-de" state_param = base64.urlsafe_b64encode( json.dumps({"course": course, "next": None}).encode() ).decode() expected_state = {"course": course, "next": None} expected_state_encoded = base64.urlsafe_b64encode( json.dumps(expected_state).encode() ).decode() mock_oauth.signin.authorize_redirect = MagicMock() mock_oauth.signin.authorize_redirect.return_value = redirect( "/just/here/to/return/a/redirect/object" ) # WHEN self.client.get( reverse("sso:login"), {"state": state_param}, ) # THEN mock_oauth.signin.authorize_redirect.assert_called_once_with( ANY, "/sso/callback", state=expected_state_encoded, ui_locales="de", ) @override_settings(OAUTH_SIGNIN_REDIRECT_URI="/sso/callback") @patch("vbv_lernwelt.sso.views.oauth") def test_signin_next_url(self, mock_oauth): # GIVEN next_url = "/some/next/url" expected_state = {"course": None, "next": next_url} expected_state_encoded = base64.urlsafe_b64encode( json.dumps(expected_state).encode() ).decode() mock_oauth.signin.authorize_redirect = MagicMock() mock_oauth.signin.authorize_redirect.return_value = redirect( "/just/here/to/return/a/redirect/object" ) # WHEN self.client.get( reverse("sso:login"), {"next": next_url}, ) # THEN mock_oauth.signin.authorize_redirect.assert_called_once_with( ANY, "/sso/callback", state=expected_state_encoded, ui_locales=ANY, ) @override_settings(OAUTH_SIGNIN_REDIRECT_URI="/sso/callback") @patch("vbv_lernwelt.sso.views.oauth") def test_signin_language(self, mock_oauth): # GIVEN language = "fr" mock_oauth.signin.authorize_redirect = MagicMock() mock_oauth.signin.authorize_redirect.return_value = redirect( "/just/here/to/return/a/redirect/object" ) # WHEN self.client.get( reverse("sso:login"), {"lang": language}, ) # THEN mock_oauth.signin.authorize_redirect.assert_called_once_with( ANY, "/sso/callback", state=ANY, ui_locales=language, ) class TestSignUp(TestCase): @override_settings(OAUTH_SIGNUP_REDIRECT_URI="/sso/login") @patch("vbv_lernwelt.sso.views.oauth") def test_signup_with_course_param(self, mock_oauth): # GIVEN course_param = "vv-de" next_param = "/some-next-url" language = "fr" expected_state = {"course": course_param, "next": next_param} expected_state_encoded = base64.urlsafe_b64encode( json.dumps(expected_state).encode() ).decode() mock_oauth.signup.authorize_redirect = MagicMock() mock_oauth.signup.authorize_redirect.return_value = redirect( "/just/here/to/return/a/redirect/object" ) # WHEN self.client.get( reverse("sso:signup"), {"course": course_param, "next": next_param, "lang": language}, ) # THEN mock_oauth.signup.authorize_redirect.assert_called_once_with( ANY, "/sso/login", state=expected_state_encoded, lang=language, )