Handle creation and removal of course session group objects

This commit is contained in:
Christian Cueni 2024-06-24 14:47:49 +02:00
parent 6f71fc2fd7
commit cb9d5de9a6
3 changed files with 92 additions and 30 deletions

View File

@ -4,3 +4,10 @@ from django.apps import AppConfig
class SsoConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "vbv_lernwelt.sso"
def ready(self):
try:
# pylint: disable=unused-import,import-outside-toplevel
import vbv_lernwelt.sso.signals # noqa F401
except ImportError:
pass

View File

@ -136,6 +136,7 @@ def _get_role_request_data(course_roles: CourseRolesType) -> List[Dict[str, str]
"Role or course not found in SSO_ROLES",
course_slug=course_slug,
role=role,
label="role_sync",
)
return request_roles

View File

@ -1,7 +1,10 @@
from django.db.models.signals import post_delete, pre_save
import structlog
from django.db.models.signals import m2m_changed, post_delete, pre_save
from django.dispatch import receiver
from vbv_lernwelt.core.models import User
from vbv_lernwelt.course.models import CourseSessionUser
from vbv_lernwelt.course_session_group.models import CourseSessionGroup
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
from vbv_lernwelt.sso.models import SsoSyncError
from vbv_lernwelt.sso.role_sync.services import (
@ -10,30 +13,22 @@ from vbv_lernwelt.sso.role_sync.services import (
update_roles_for_user,
)
@receiver(post_delete, sender=CourseSessionUser, dispatch_uid="delete_sso_roles")
def remove_sso_roles(sender, instance, **kwargs):
try:
remove_roles_from_user(
instance.user, [(instance.course_session.course.slug, instance.role)]
)
except MyVbvKeycloakDeleteError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=instance.user, action=SsoSyncError.Action.REMOVE, data=additional_data
)
logger = structlog.get_logger(__name__)
@receiver(pre_save, sender=CourseSessionUser, dispatch_uid="update_sso_roles")
def update_sso_roles(sender, instance: CourseSessionUser, **kwargs):
try:
if instance.created_at is None:
add_roles_to_user(
instance.user, [(instance.course_session.course.slug, instance.role)]
)
else:
old_csu = CourseSessionUser.objects.get(pk=instance.pk)
if old_csu.role != instance.role:
@receiver(post_delete, sender=CourseSessionUser, dispatch_uid="delete_sso_roles_in_cs")
def remove_sso_roles_in_cs(sender, instance, **kwargs):
_remove_sso_role(instance.user, instance.course_session.course.slug, instance.role)
@receiver(pre_save, sender=CourseSessionUser, dispatch_uid="update_sso_roles_in_cs")
def update_sso_roles_in_cs(sender, instance: CourseSessionUser, **kwargs):
if not instance.created_at:
_add_sso_role(instance.user, instance.course_session.course.slug, instance.role)
else:
old_csu = CourseSessionUser.objects.get(pk=instance.pk)
if old_csu.role != instance.role:
try:
update_roles_for_user(
instance.user,
add_course_roles=[
@ -43,13 +38,72 @@ def update_sso_roles(sender, instance: CourseSessionUser, **kwargs):
(instance.course_session.course.slug, old_csu.role)
],
)
except MyVbvKeycloakDeleteError as e:
_handle_remove_exception(instance.user, e)
except MyVbvKeycloakPostError as e:
_handle_add_exception(instance.user, e)
@receiver(
post_delete, sender=CourseSessionGroup, dispatch_uid="delete_sso_roles_in_csg"
)
def remove_sso_roles_in_csg(sender, instance: CourseSessionGroup, **kwargs):
for user in instance.supervisor.all():
_remove_sso_role(user, instance.course.slug, "SUPERVISOR")
@receiver(
m2m_changed,
sender=CourseSessionGroup.supervisor.through,
dispatch_uid="update_sso_roles_in_csg",
)
def update_sso_roles_in_csg(sender, instance, action, reverse, model, pk_set, **kwargs):
if action == "pre_add":
added_supervisors = model.objects.filter(pk__in=pk_set)
for user in added_supervisors:
_add_sso_role(user, instance.course.slug, "SUPERVISOR")
elif action == "pre_remove":
removed_supervisors = model.objects.filter(pk__in=pk_set)
for user in removed_supervisors:
_remove_sso_role(user, instance.course.slug, "SUPERVISOR")
def _remove_sso_role(user: User, course_slug: str, role: str):
try:
logger.debug(
"Removing SUPERVISOR role from user",
user=user,
course=course_slug,
label="role_sync",
)
remove_roles_from_user(user, [(course_slug, role)])
except MyVbvKeycloakDeleteError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=instance.user, action=SsoSyncError.Action.REMOVE, data=additional_data
_handle_remove_exception(user, e)
def _add_sso_role(user: User, course_slug: str, role: str):
try:
logger.debug(
"Adding SUPERVISOR role to user",
user=user,
course=course_slug,
label="role_sync",
)
add_roles_to_user(user, [(course_slug, role)])
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=instance.user, action=SsoSyncError.Action.ADD, data=additional_data
)
_handle_add_exception(user, e)
def _handle_add_exception(user: User, e: MyVbvKeycloakPostError):
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.ADD, data=additional_data
)
def _handle_remove_exception(user: User, e: MyVbvKeycloakDeleteError):
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.REMOVE, data=additional_data
)