import structlog from django.db.models.signals import m2m_changed, post_delete, pre_save from django.dispatch import receiver from keycloak.exceptions import KeycloakDeleteError, KeycloakError, KeycloakPostError from vbv_lernwelt.core.models import User from vbv_lernwelt.course.models import CourseSessionUser from vbv_lernwelt.course_session_group.models import CourseSessionGroup from vbv_lernwelt.sso.role_sync.services import ( add_roles_to_user, remove_roles_from_user, update_roles_for_user, ) logger = structlog.get_logger(__name__) # CourseSessionUser @receiver(post_delete, sender=CourseSessionUser, dispatch_uid="delete_sso_roles_in_cs") def remove_sso_roles_in_cs(sender, instance, **kwargs): _remove_sso_role(instance.user, instance.course_session.course.slug, instance.role) @receiver(pre_save, sender=CourseSessionUser, dispatch_uid="update_sso_roles_in_cs") def update_sso_roles_in_cs(sender, instance: CourseSessionUser, **kwargs): if not instance.created_at: _add_sso_role(instance.user, instance.course_session.course.slug, instance.role) else: old_csu = CourseSessionUser.objects.get(pk=instance.pk) if old_csu.role != instance.role: try: update_roles_for_user( instance.user, add_course_roles=[ (instance.course_session.course.slug, instance.role) ], remove_course_roles=[ (instance.course_session.course.slug, old_csu.role) ], ) except KeycloakError: # fail silently, error object is being created in the service pass # CourseSessionGroup @receiver( post_delete, sender=CourseSessionGroup, dispatch_uid="delete_sso_roles_in_csg" ) def remove_sso_roles_in_csg(sender, instance: CourseSessionGroup, **kwargs): for user in instance.supervisor.all(): _remove_sso_role(user, instance.course.slug, "SUPERVISOR") @receiver( m2m_changed, sender=CourseSessionGroup.supervisor.through, dispatch_uid="update_sso_roles_in_csg", ) def update_sso_roles_in_csg(sender, instance, action, reverse, model, pk_set, **kwargs): if action == "pre_add": added_supervisors = model.objects.filter(pk__in=pk_set) for user in added_supervisors: _add_sso_role(user, instance.course.slug, "SUPERVISOR") elif action == "pre_remove": removed_supervisors = model.objects.filter(pk__in=pk_set) for user in removed_supervisors: _remove_sso_role(user, instance.course.slug, "SUPERVISOR") def _remove_sso_role(user: User, course_slug: str, role: str): try: logger.debug( "Removing SUPERVISOR role from user", user=user, course=course_slug, label="role_sync", ) remove_roles_from_user(user, [(course_slug, role)]) except KeycloakDeleteError: # fail silently, error object is being created in the service pass def _add_sso_role(user: User, course_slug: str, role: str): try: logger.debug( "Adding SUPERVISOR role to user", user=user, course=course_slug, label="role_sync", ) add_roles_to_user(user, [(course_slug, role)]) except KeycloakPostError: # fail silently, error object is being created in the service pass