Add services test
This commit is contained in:
parent
ade89c3c5b
commit
e436c5ddbd
|
|
@ -53,7 +53,7 @@ class CreateOrUpdateStudentTestCase(TestCase):
|
|||
"Tel. Privat": "079 593 83 43",
|
||||
"Geburtsdatum": "01.01.2000",
|
||||
"email_notification_categories": ["INFORMATION"],
|
||||
'intermediate_sso_id': ''
|
||||
"intermediate_sso_id": "",
|
||||
}
|
||||
|
||||
def test_create_student(self):
|
||||
|
|
|
|||
|
|
@ -1,14 +1,17 @@
|
|||
import unicodedata
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
import structlog
|
||||
from django.conf import settings
|
||||
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
|
||||
from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError
|
||||
from keycloak.exceptions import KeycloakDeleteError, KeycloakError, KeycloakPostError
|
||||
|
||||
from vbv_lernwelt.core.models import User
|
||||
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
|
||||
from vbv_lernwelt.sso.role_sync.roles import ROLE_IDS, SSO_ROLES
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
CourseRolesType = List[Tuple[str, str]]
|
||||
|
||||
if settings.OAUTH_SYNC_ROLES:
|
||||
|
|
@ -25,31 +28,44 @@ if settings.OAUTH_SYNC_ROLES:
|
|||
|
||||
|
||||
def add_roles_to_user(user: User, course_roles: CourseRolesType):
|
||||
user_id = user.additional_json_data.get("intermediate_sso_id", "")
|
||||
if settings.OAUTH_SYNC_ROLES and user_id:
|
||||
request_roles = _get_role_request_data(course_roles)
|
||||
try:
|
||||
keycloak_admin.assign_realm_roles(
|
||||
user_id=user_id,
|
||||
roles=request_roles,
|
||||
)
|
||||
except KeycloakPostError as e:
|
||||
raise MyVbvKeycloakPostError(e, request_roles)
|
||||
return True
|
||||
return False
|
||||
return _handle_add_remove_action(
|
||||
user=user,
|
||||
course_roles=course_roles,
|
||||
func=keycloak_admin.assign_realm_roles,
|
||||
kc_exception=KeycloakPostError,
|
||||
myvbv_exception=MyVbvKeycloakPostError,
|
||||
)
|
||||
|
||||
|
||||
def remove_roles_from_user(user: User, course_roles: CourseRolesType):
|
||||
return _handle_add_remove_action(
|
||||
user=user,
|
||||
course_roles=course_roles,
|
||||
func=keycloak_admin.delete_realm_roles_of_user,
|
||||
kc_exception=KeycloakDeleteError,
|
||||
myvbv_exception=MyVbvKeycloakDeleteError,
|
||||
)
|
||||
|
||||
|
||||
def _handle_add_remove_action(
|
||||
user: User,
|
||||
course_roles: CourseRolesType,
|
||||
func: callable,
|
||||
kc_exception: KeycloakError,
|
||||
myvbv_exception: KeycloakPostError or KeycloakDeleteError,
|
||||
):
|
||||
user_id = user.additional_json_data.get("intermediate_sso_id", "")
|
||||
if settings.OAUTH_SYNC_ROLES and user_id:
|
||||
request_roles = _get_role_request_data(course_roles)
|
||||
if not request_roles:
|
||||
return False
|
||||
try:
|
||||
keycloak_admin.delete_realm_roles_of_user(
|
||||
func(
|
||||
user_id=user_id,
|
||||
roles=request_roles,
|
||||
)
|
||||
except KeycloakDeleteError as e:
|
||||
raise MyVbvKeycloakDeleteError(e, request_roles)
|
||||
except kc_exception as e:
|
||||
raise myvbv_exception(e, request_roles)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
@ -58,9 +74,9 @@ def update_roles_for_user(
|
|||
user: User, add_course_roles: CourseRolesType, remove_course_roles: CourseRolesType
|
||||
):
|
||||
if settings.OAUTH_SYNC_ROLES:
|
||||
remove_roles_from_user(user, remove_course_roles)
|
||||
add_roles_to_user(user, add_course_roles)
|
||||
return True
|
||||
remove_ret_value = remove_roles_from_user(user, remove_course_roles)
|
||||
add_ret_value = add_roles_to_user(user, add_course_roles)
|
||||
return remove_ret_value and add_ret_value
|
||||
return False
|
||||
|
||||
|
||||
|
|
@ -110,8 +126,15 @@ def _get_role_request_data(course_roles: CourseRolesType) -> List[Dict[str, str]
|
|||
for item in course_roles:
|
||||
course_slug, role = item
|
||||
sanitized_course_slug = _remove_accents(course_slug)
|
||||
oauth_role = _create_role_name(sanitized_course_slug, role)
|
||||
request_roles.append({"id": ROLE_IDS[oauth_role], "name": oauth_role})
|
||||
try:
|
||||
oauth_role = _create_role_name(sanitized_course_slug, role)
|
||||
request_roles.append({"id": ROLE_IDS[oauth_role], "name": oauth_role})
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
"Role or course not found in SSO_ROLES",
|
||||
course_slug=course_slug,
|
||||
role=role,
|
||||
)
|
||||
return request_roles
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ from vbv_lernwelt.sso.role_sync.services import (
|
|||
|
||||
|
||||
@receiver(post_delete, sender=CourseSessionUser, dispatch_uid="delete_sso_roles")
|
||||
def delete_sso_roles(sender, instance, **kwargs):
|
||||
def remove_sso_roles(sender, instance, **kwargs):
|
||||
try:
|
||||
remove_roles_from_user(
|
||||
instance.user, [(instance.course_session.course.slug, instance.role)]
|
||||
|
|
|
|||
|
|
@ -0,0 +1,157 @@
|
|||
from unittest.mock import patch
|
||||
|
||||
from django.test import override_settings, TestCase
|
||||
from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError
|
||||
|
||||
from vbv_lernwelt.core.models import User
|
||||
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
|
||||
from vbv_lernwelt.sso.role_sync.services import (
|
||||
_remove_accents,
|
||||
add_roles_to_user,
|
||||
create_user,
|
||||
remove_roles_from_user,
|
||||
sync_roles_for_user,
|
||||
update_roles_for_user,
|
||||
)
|
||||
|
||||
|
||||
class ApiTestCase(TestCase):
|
||||
def setUp(self):
|
||||
self.user = User(email="test@example.com", first_name="Test", last_name="User")
|
||||
self.user.additional_json_data = {"intermediate_sso_id": "1234"}
|
||||
self.course_roles = [
|
||||
("überbetriebliche-kurse", "EXPERT"),
|
||||
("versicherungsvermittler-in", "MEMBER"),
|
||||
]
|
||||
self.expected_roles = [
|
||||
{
|
||||
"name": "myvbv-uberbetriebliche-kurse-expert",
|
||||
"id": "c7e33cb6-d227-4764-9b8e-d42af79fb46d",
|
||||
},
|
||||
{
|
||||
"name": "myvbv-versicherungsvermittler-in-member",
|
||||
"id": "3ab4eab2-7d7c-43bb-a927-4cf54f24ccc2",
|
||||
},
|
||||
]
|
||||
|
||||
@override_settings(OAUTH_SYNC_ROLES=True)
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.keycloak_admin")
|
||||
def test_add_roles_to_user_success(self, mock_keycloak_admin):
|
||||
mock_keycloak_admin.assign_realm_roles.return_value = None
|
||||
|
||||
result = add_roles_to_user(self.user, self.course_roles)
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_keycloak_admin.assign_realm_roles.assert_called_once_with(
|
||||
user_id="1234", roles=self.expected_roles
|
||||
)
|
||||
|
||||
@override_settings(OAUTH_SYNC_ROLES=True)
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.keycloak_admin")
|
||||
def test_add_roles_to_user_keycloak_post_error(self, mock_keycloak_admin):
|
||||
mock_keycloak_admin.assign_realm_roles.side_effect = KeycloakPostError
|
||||
|
||||
with self.assertRaises(MyVbvKeycloakPostError) as cm:
|
||||
add_roles_to_user(self.user, self.course_roles)
|
||||
|
||||
exception = cm.exception
|
||||
self.assertIsInstance(exception, MyVbvKeycloakPostError)
|
||||
self.assertEqual(exception.additional_data, self.expected_roles)
|
||||
|
||||
@override_settings(OAUTH_SYNC_ROLES=True)
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.keycloak_admin")
|
||||
def test_remove_roles_to_user_success(self, mock_keycloak_admin):
|
||||
mock_keycloak_admin.delete_realm_roles_of_user.return_value = None
|
||||
|
||||
result = remove_roles_from_user(self.user, self.course_roles)
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_keycloak_admin.delete_realm_roles_of_user.assert_called_once_with(
|
||||
user_id="1234", roles=self.expected_roles
|
||||
)
|
||||
|
||||
@override_settings(OAUTH_SYNC_ROLES=True)
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.keycloak_admin")
|
||||
def test_remove_roles_to_user_keycloak_delete_error(self, mock_keycloak_admin):
|
||||
mock_keycloak_admin.delete_realm_roles_of_user.side_effect = KeycloakDeleteError
|
||||
|
||||
with self.assertRaises(MyVbvKeycloakDeleteError) as cm:
|
||||
remove_roles_from_user(self.user, self.course_roles)
|
||||
|
||||
exception = cm.exception
|
||||
self.assertIsInstance(exception, MyVbvKeycloakDeleteError)
|
||||
self.assertEqual(exception.additional_data, self.expected_roles)
|
||||
|
||||
@override_settings(OAUTH_SYNC_ROLES=True)
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.remove_roles_from_user")
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.add_roles_to_user")
|
||||
def test_update_roles_to_user(
|
||||
self, mock_add_roles_to_user, mock_remove_roles_from_user
|
||||
):
|
||||
mock_add_roles_to_user.return_value = True
|
||||
mock_remove_roles_from_user.return_value = True
|
||||
|
||||
update_roles_for_user(self.user, self.course_roles, self.course_roles)
|
||||
mock_add_roles_to_user.assert_called_once()
|
||||
mock_remove_roles_from_user.assert_called_once()
|
||||
|
||||
@override_settings(OAUTH_SYNC_ROLES=True)
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.keycloak_admin")
|
||||
def test_sync_roles_to_user(self, mock_keycloak_admin):
|
||||
mock_keycloak_admin.get_realm_roles_of_user.return_value = (
|
||||
self.expected_roles
|
||||
) # just use them here as well
|
||||
mock_keycloak_admin.delete_realm_roles_of_user.return_value = True
|
||||
mock_keycloak_admin.assign_realm_roles.return_value = None
|
||||
|
||||
sync_roles_for_user(self.user, self.course_roles)
|
||||
mock_keycloak_admin.get_realm_roles_of_user.assert_called_once_with(
|
||||
user_id="1234"
|
||||
)
|
||||
mock_keycloak_admin.delete_realm_roles_of_user.assert_called_once_with(
|
||||
user_id="1234", roles=self.expected_roles
|
||||
)
|
||||
mock_keycloak_admin.assign_realm_roles.assert_called_once_with(
|
||||
user_id="1234", roles=self.expected_roles
|
||||
)
|
||||
|
||||
@override_settings(OAUTH_SYNC_ROLES=True)
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.keycloak_admin")
|
||||
def test_create_user(self, mock_keycloak_admin):
|
||||
mock_keycloak_admin.create_user.return_value = "im-an-uuid-1234"
|
||||
|
||||
user_data = {
|
||||
"username": self.user.email,
|
||||
"email": self.user.email,
|
||||
"enabled": True,
|
||||
"firstName": self.user.first_name,
|
||||
"lastName": self.user.last_name,
|
||||
}
|
||||
|
||||
create_user(self.user)
|
||||
mock_keycloak_admin.create_user.assert_called_once_with(
|
||||
user_data, exist_ok=True
|
||||
)
|
||||
|
||||
@override_settings(OAUTH_SYNC_ROLES=True)
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.keycloak_admin")
|
||||
def test_ignore_missing_course(self, mock_keycloak_admin):
|
||||
mock_keycloak_admin.assign_realm_roles.return_value = None
|
||||
|
||||
course_roles = [
|
||||
("blabla-kurse", "EXPERT"),
|
||||
]
|
||||
result = add_roles_to_user(self.user, course_roles)
|
||||
|
||||
self.assertFalse(result)
|
||||
mock_keycloak_admin.assign_realm_roles.assert_not_called()
|
||||
|
||||
|
||||
class HelpersTestCase(TestCase):
|
||||
def test_remove_accents(self):
|
||||
no_accents = _remove_accents("äüöéèà")
|
||||
self.assertEqual(no_accents, "auoeea")
|
||||
|
||||
|
||||
# test helpers
|
||||
# test wrong key
|
||||
Loading…
Reference in New Issue