Refactor code
This commit is contained in:
parent
cb9d5de9a6
commit
fbd40de918
|
|
@ -1,10 +1,10 @@
|
|||
from django.contrib import admin, messages
|
||||
from django.contrib.auth import admin as auth_admin, get_user_model
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError
|
||||
|
||||
from vbv_lernwelt.course.models import CourseSessionUser
|
||||
from vbv_lernwelt.importer.services import update_user_json_data
|
||||
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
|
||||
from vbv_lernwelt.sso.models import SsoSyncError, SsoUser
|
||||
from vbv_lernwelt.sso.role_sync.services import create_user, sync_roles_for_user
|
||||
|
||||
|
|
@ -19,15 +19,11 @@ def create_sso_user_from_admin(user: User, request):
|
|||
messages.add_message(
|
||||
request, messages.SUCCESS, f"Der Bentuzer wurde in Keycloak erstellt."
|
||||
)
|
||||
except MyVbvKeycloakPostError as e:
|
||||
additional_data = getattr(e, "additional_data", {})
|
||||
SsoSyncError.objects.create(
|
||||
user=user, action=SsoSyncError.Action.CREATE, data=additional_data
|
||||
)
|
||||
except KeycloakPostError as e:
|
||||
messages.add_message(
|
||||
request,
|
||||
messages.WARNING,
|
||||
f"Der Benutzer ({e}) konnte nicht in Keycloak erstellt werden.",
|
||||
f"Der Benutzer {user} konnte nicht in Keycloak erstellt werden: {e}",
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -41,25 +37,17 @@ def sync_sso_roles_from_admin(user: User, request):
|
|||
messages.add_message(
|
||||
request, messages.SUCCESS, f"Die Daten wurden mit Keycloak synchronisiert."
|
||||
)
|
||||
except MyVbvKeycloakDeleteError as e:
|
||||
additional_data = getattr(e, "additional_data", {})
|
||||
SsoSyncError.objects.create(
|
||||
user=user, action=SsoSyncError.Action.REMOVE, data=additional_data
|
||||
)
|
||||
except KeycloakDeleteError as e:
|
||||
messages.add_message(
|
||||
request,
|
||||
messages.WARNING,
|
||||
f"Die bestehenden Rollen für Benutzer ({e}) konnten in Keycloak nicht gelöscht werden.",
|
||||
)
|
||||
except MyVbvKeycloakPostError as e:
|
||||
additional_data = getattr(e, "additional_data", {})
|
||||
SsoSyncError.objects.create(
|
||||
user=user, action=SsoSyncError.Action.ADD, data=additional_data
|
||||
f"Die bestehenden Rollen für Benutzer ({user}) konnten in Keycloak nicht gelöscht werden: {e}",
|
||||
)
|
||||
except KeycloakPostError as e:
|
||||
messages.add_message(
|
||||
request,
|
||||
messages.WARNING,
|
||||
f"Die neuen Rollen für Benutzer ({e}) konnten in Keycloak nicht erstellt werden.",
|
||||
f"Die neuen Rollen für Benutzer ({user}) konnten in Keycloak nicht erstellt werden: {e}",
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,23 +0,0 @@
|
|||
from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError
|
||||
|
||||
|
||||
class MyVbvKeycloakDeleteError(KeycloakDeleteError):
|
||||
def __init__(
|
||||
self, keycloak_error: KeycloakDeleteError, additional_data: list | dict
|
||||
):
|
||||
super().__init__(
|
||||
keycloak_error.error_message,
|
||||
keycloak_error.response_code,
|
||||
keycloak_error.response_body,
|
||||
)
|
||||
self.additional_data = additional_data
|
||||
|
||||
|
||||
class MyVbvKeycloakPostError(KeycloakPostError):
|
||||
def __init__(self, keycloak_error: KeycloakPostError, additional_data: list | dict):
|
||||
super().__init__(
|
||||
keycloak_error.error_message,
|
||||
keycloak_error.response_code,
|
||||
keycloak_error.response_body,
|
||||
)
|
||||
self.additional_data = additional_data
|
||||
|
|
@ -4,15 +4,16 @@ from typing import Dict, List, Tuple
|
|||
import structlog
|
||||
from django.conf import settings
|
||||
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
|
||||
from keycloak.exceptions import KeycloakDeleteError, KeycloakError, KeycloakPostError
|
||||
from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError
|
||||
|
||||
from vbv_lernwelt.core.models import User
|
||||
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
|
||||
from vbv_lernwelt.sso.models import SsoSyncError
|
||||
from vbv_lernwelt.sso.role_sync.roles import ROLE_IDS, SSO_ROLES
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
CourseRolesType = List[Tuple[str, str]]
|
||||
KeyCloakRolesType = List[Dict[str, str]]
|
||||
|
||||
if settings.OAUTH_SYNC_ROLES:
|
||||
keycloak_connection = KeycloakOpenIDConnection(
|
||||
|
|
@ -29,43 +30,32 @@ if settings.OAUTH_SYNC_ROLES:
|
|||
|
||||
def add_roles_to_user(user: User, course_roles: CourseRolesType):
|
||||
return _handle_add_remove_action(
|
||||
user=user,
|
||||
course_roles=course_roles,
|
||||
func=keycloak_admin.assign_realm_roles,
|
||||
kc_exception=KeycloakPostError,
|
||||
myvbv_exception=MyVbvKeycloakPostError,
|
||||
user=user, course_roles=course_roles, action=SsoSyncError.Action.ADD
|
||||
)
|
||||
|
||||
|
||||
def remove_roles_from_user(user: User, course_roles: CourseRolesType):
|
||||
return _handle_add_remove_action(
|
||||
user=user,
|
||||
course_roles=course_roles,
|
||||
func=keycloak_admin.delete_realm_roles_of_user,
|
||||
kc_exception=KeycloakDeleteError,
|
||||
myvbv_exception=MyVbvKeycloakDeleteError,
|
||||
user=user, course_roles=course_roles, action=SsoSyncError.Action.REMOVE
|
||||
)
|
||||
|
||||
|
||||
def _handle_add_remove_action(
|
||||
user: User,
|
||||
course_roles: CourseRolesType,
|
||||
func: callable,
|
||||
kc_exception: KeycloakError,
|
||||
myvbv_exception: KeycloakPostError or KeycloakDeleteError,
|
||||
action: SsoSyncError.Action,
|
||||
):
|
||||
user_id = user.additional_json_data.get("intermediate_sso_id", "")
|
||||
if settings.OAUTH_SYNC_ROLES and user_id:
|
||||
request_roles = _get_role_request_data(course_roles)
|
||||
if not request_roles:
|
||||
return False
|
||||
try:
|
||||
func(
|
||||
user_id=user_id,
|
||||
roles=request_roles,
|
||||
)
|
||||
except kc_exception as e:
|
||||
raise myvbv_exception(e, request_roles)
|
||||
|
||||
if action == SsoSyncError.Action.ADD:
|
||||
_kc_assign_realm_roles(user, user_id, request_roles)
|
||||
elif action == SsoSyncError.Action.REMOVE:
|
||||
_kc_delete_realm_roles(user, user_id, request_roles)
|
||||
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
@ -84,14 +74,14 @@ def sync_roles_for_user(user: User, course_roles: CourseRolesType):
|
|||
if settings.OAUTH_SYNC_ROLES:
|
||||
user_id = user.additional_json_data.get("intermediate_sso_id", "")
|
||||
if user_id:
|
||||
# ignore for the moment, just in the admin
|
||||
assigned_roles = _filter_non_myvbv_roles(
|
||||
keycloak_admin.get_realm_roles_of_user(user_id=user_id)
|
||||
)
|
||||
|
||||
if assigned_roles:
|
||||
keycloak_admin.delete_realm_roles_of_user(
|
||||
user_id=user_id,
|
||||
roles=assigned_roles,
|
||||
)
|
||||
_kc_delete_realm_roles(user, user_id, assigned_roles)
|
||||
|
||||
roles = _get_role_request_data(course_roles)
|
||||
keycloak_admin.assign_realm_roles(user_id=user_id, roles=roles)
|
||||
return True
|
||||
|
|
@ -100,18 +90,7 @@ def sync_roles_for_user(user: User, course_roles: CourseRolesType):
|
|||
|
||||
def create_user(user: User):
|
||||
if settings.OAUTH_SYNC_ROLES:
|
||||
user_data = {
|
||||
"username": user.email,
|
||||
"email": user.email,
|
||||
"enabled": True,
|
||||
"firstName": user.first_name,
|
||||
"lastName": user.last_name,
|
||||
}
|
||||
try:
|
||||
user_id = keycloak_admin.create_user(user_data, exist_ok=True)
|
||||
except KeycloakPostError as e:
|
||||
raise MyVbvKeycloakPostError(e, user_data)
|
||||
return user_id
|
||||
return _kc_create_user(user)
|
||||
return ""
|
||||
|
||||
|
||||
|
|
@ -123,7 +102,43 @@ def get_roles_for_user(user_id: str):
|
|||
return []
|
||||
|
||||
|
||||
def _get_role_request_data(course_roles: CourseRolesType) -> List[Dict[str, str]]:
|
||||
# Keycloak wrappers
|
||||
def _kc_assign_realm_roles(user: User, user_id: str, roles: List[KeyCloakRolesType]):
|
||||
try:
|
||||
keycloak_admin.assign_realm_roles(user_id=user_id, roles=roles)
|
||||
except KeycloakPostError as e:
|
||||
SsoSyncError.objects.create(
|
||||
user=user, action=SsoSyncError.Action.ADD, data=roles
|
||||
)
|
||||
raise e
|
||||
|
||||
|
||||
def _kc_delete_realm_roles(user: User, user_id: str, roles: List[KeyCloakRolesType]):
|
||||
try:
|
||||
keycloak_admin.delete_realm_roles_of_user(user_id=user_id, roles=roles)
|
||||
except KeycloakDeleteError as e:
|
||||
SsoSyncError.objects.create(
|
||||
user=user, action=SsoSyncError.Action.REMOVE, data=roles
|
||||
)
|
||||
raise e
|
||||
|
||||
|
||||
def _kc_create_user(user: User) -> str:
|
||||
user_data = {
|
||||
"username": user.email,
|
||||
"email": user.email,
|
||||
"enabled": True,
|
||||
"firstName": user.first_name,
|
||||
"lastName": user.last_name,
|
||||
}
|
||||
try:
|
||||
return keycloak_admin.create_user(user_data, exist_ok=True)
|
||||
except KeycloakPostError as e:
|
||||
SsoSyncError.objects.create(user=user, action=SsoSyncError.Action.ADD, data={})
|
||||
raise e
|
||||
|
||||
|
||||
def _get_role_request_data(course_roles: CourseRolesType) -> List[KeyCloakRolesType]:
|
||||
request_roles = []
|
||||
for item in course_roles:
|
||||
course_slug, role = item
|
||||
|
|
@ -150,5 +165,5 @@ def _remove_accents(input_str) -> str:
|
|||
return "".join([char for char in nfkd_form if not unicodedata.combining(char)])
|
||||
|
||||
|
||||
def _filter_non_myvbv_roles(roles: List[str]) -> List[str]:
|
||||
def _filter_non_myvbv_roles(roles: List[KeyCloakRolesType]) -> List[KeyCloakRolesType]:
|
||||
return [role for role in roles if role["name"].startswith("myvbv-")]
|
||||
|
|
|
|||
|
|
@ -1,12 +1,11 @@
|
|||
import structlog
|
||||
from django.db.models.signals import m2m_changed, post_delete, pre_save
|
||||
from django.dispatch import receiver
|
||||
from keycloak.exceptions import KeycloakDeleteError, KeycloakError, KeycloakPostError
|
||||
|
||||
from vbv_lernwelt.core.models import User
|
||||
from vbv_lernwelt.course.models import CourseSessionUser
|
||||
from vbv_lernwelt.course_session_group.models import CourseSessionGroup
|
||||
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
|
||||
from vbv_lernwelt.sso.models import SsoSyncError
|
||||
from vbv_lernwelt.sso.role_sync.services import (
|
||||
add_roles_to_user,
|
||||
remove_roles_from_user,
|
||||
|
|
@ -38,10 +37,9 @@ def update_sso_roles_in_cs(sender, instance: CourseSessionUser, **kwargs):
|
|||
(instance.course_session.course.slug, old_csu.role)
|
||||
],
|
||||
)
|
||||
except MyVbvKeycloakDeleteError as e:
|
||||
_handle_remove_exception(instance.user, e)
|
||||
except MyVbvKeycloakPostError as e:
|
||||
_handle_add_exception(instance.user, e)
|
||||
except KeycloakError:
|
||||
# fail silently, error object is being created in the service
|
||||
pass
|
||||
|
||||
|
||||
@receiver(
|
||||
|
|
@ -78,8 +76,9 @@ def _remove_sso_role(user: User, course_slug: str, role: str):
|
|||
label="role_sync",
|
||||
)
|
||||
remove_roles_from_user(user, [(course_slug, role)])
|
||||
except MyVbvKeycloakDeleteError as e:
|
||||
_handle_remove_exception(user, e)
|
||||
except KeycloakDeleteError:
|
||||
# fail silently, error object is being created in the service
|
||||
pass
|
||||
|
||||
|
||||
def _add_sso_role(user: User, course_slug: str, role: str):
|
||||
|
|
@ -91,19 +90,6 @@ def _add_sso_role(user: User, course_slug: str, role: str):
|
|||
label="role_sync",
|
||||
)
|
||||
add_roles_to_user(user, [(course_slug, role)])
|
||||
except MyVbvKeycloakPostError as e:
|
||||
_handle_add_exception(user, e)
|
||||
|
||||
|
||||
def _handle_add_exception(user: User, e: MyVbvKeycloakPostError):
|
||||
additional_data = getattr(e, "additional_data", {})
|
||||
SsoSyncError.objects.create(
|
||||
user=user, action=SsoSyncError.Action.ADD, data=additional_data
|
||||
)
|
||||
|
||||
|
||||
def _handle_remove_exception(user: User, e: MyVbvKeycloakDeleteError):
|
||||
additional_data = getattr(e, "additional_data", {})
|
||||
SsoSyncError.objects.create(
|
||||
user=user, action=SsoSyncError.Action.REMOVE, data=additional_data
|
||||
)
|
||||
except KeycloakPostError:
|
||||
# fail silently, error object is being created in the service
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ from django.test import override_settings, TestCase
|
|||
from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError
|
||||
|
||||
from vbv_lernwelt.core.models import User
|
||||
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
|
||||
from vbv_lernwelt.sso.models import SsoSyncError
|
||||
from vbv_lernwelt.sso.role_sync.services import (
|
||||
_filter_non_myvbv_roles,
|
||||
_remove_accents,
|
||||
|
|
@ -20,6 +20,7 @@ class ApiTestCase(TestCase):
|
|||
def setUp(self):
|
||||
self.user = User(email="test@example.com", first_name="Test", last_name="User")
|
||||
self.user.additional_json_data = {"intermediate_sso_id": "1234"}
|
||||
self.user.save()
|
||||
self.course_roles = [
|
||||
("überbetriebliche-kurse", "EXPERT"),
|
||||
("versicherungsvermittler-in", "MEMBER"),
|
||||
|
|
@ -52,12 +53,14 @@ class ApiTestCase(TestCase):
|
|||
def test_add_roles_to_user_keycloak_post_error(self, mock_keycloak_admin):
|
||||
mock_keycloak_admin.assign_realm_roles.side_effect = KeycloakPostError
|
||||
|
||||
with self.assertRaises(MyVbvKeycloakPostError) as cm:
|
||||
with self.assertRaises(KeycloakPostError) as cm:
|
||||
add_roles_to_user(self.user, self.course_roles)
|
||||
|
||||
exception = cm.exception
|
||||
self.assertIsInstance(exception, MyVbvKeycloakPostError)
|
||||
self.assertEqual(exception.additional_data, self.expected_roles)
|
||||
self.assertIsInstance(exception, KeycloakPostError)
|
||||
error_obj = SsoSyncError.objects.get(user=self.user)
|
||||
self.assertEqual(error_obj.data, self.expected_roles)
|
||||
self.assertEqual(error_obj.action, SsoSyncError.Action.ADD)
|
||||
|
||||
@override_settings(OAUTH_SYNC_ROLES=True)
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.keycloak_admin")
|
||||
|
|
@ -76,12 +79,14 @@ class ApiTestCase(TestCase):
|
|||
def test_remove_roles_to_user_keycloak_delete_error(self, mock_keycloak_admin):
|
||||
mock_keycloak_admin.delete_realm_roles_of_user.side_effect = KeycloakDeleteError
|
||||
|
||||
with self.assertRaises(MyVbvKeycloakDeleteError) as cm:
|
||||
with self.assertRaises(KeycloakDeleteError) as cm:
|
||||
remove_roles_from_user(self.user, self.course_roles)
|
||||
|
||||
exception = cm.exception
|
||||
self.assertIsInstance(exception, MyVbvKeycloakDeleteError)
|
||||
self.assertEqual(exception.additional_data, self.expected_roles)
|
||||
self.assertIsInstance(exception, KeycloakDeleteError)
|
||||
error_obj = SsoSyncError.objects.get(user=self.user)
|
||||
self.assertEqual(error_obj.data, self.expected_roles)
|
||||
self.assertEqual(error_obj.action, SsoSyncError.Action.REMOVE)
|
||||
|
||||
@override_settings(OAUTH_SYNC_ROLES=True)
|
||||
@patch("vbv_lernwelt.sso.role_sync.services.remove_roles_from_user")
|
||||
|
|
|
|||
Loading…
Reference in New Issue