import structlog from django.db.models.signals import m2m_changed, post_delete, pre_delete, pre_save from django.dispatch import receiver from keycloak.exceptions import KeycloakDeleteError, KeycloakError, KeycloakPostError from vbv_lernwelt.core.models import User from vbv_lernwelt.course.models import CourseSessionUser from vbv_lernwelt.course_session_group.models import CourseSessionGroup from vbv_lernwelt.learning_mentor.models import LearningMentor from vbv_lernwelt.sso.role_sync.services import ( add_roles_to_user, remove_roles_from_user, update_roles_for_user, ) logger = structlog.get_logger(__name__) # CourseSessionUser @receiver(post_delete, sender=CourseSessionUser, dispatch_uid="delete_sso_roles_in_cs") def remove_sso_roles_in_cs(sender, instance, **kwargs): # check if the user has any other roles in the course if not CourseSessionUser.objects.filter( user=instance.user, course_session__course=instance.course_session.course ).exists(): _remove_sso_role( instance.user, instance.course_session.course.slug, instance.role ) @receiver(pre_save, sender=CourseSessionUser, dispatch_uid="update_sso_roles_in_cs") def update_sso_roles_in_cs(sender, instance: CourseSessionUser, **kwargs): if not instance.created_at: _add_sso_role(instance.user, instance.course_session.course.slug, instance.role) else: old_csu = CourseSessionUser.objects.get(pk=instance.pk) if ( old_csu.role != instance.role or old_csu.course_session.course != instance.course_session.course ): try: update_roles_for_user( instance.user, add_course_roles=[ (instance.course_session.course.slug, instance.role) ], remove_course_roles=[ (old_csu.course_session.course.slug, old_csu.role) ], ) except KeycloakError: # fail silently, error object is being created in the service pass # CourseSessionGroup @receiver(pre_delete, sender=CourseSessionGroup, dispatch_uid="delete_sso_roles_in_csg") def remove_sso_roles_in_csg(sender, instance: CourseSessionGroup, **kwargs): for user in instance.supervisor.all(): _remove_sso_role(user, instance.course.slug, "SUPERVISOR") @receiver( m2m_changed, sender=CourseSessionGroup.supervisor.through, dispatch_uid="update_sso_roles_in_csg", ) def update_sso_roles_in_csg(sender, instance, action, reverse, model, pk_set, **kwargs): if action == "pre_add": added_supervisors = model.objects.filter(pk__in=pk_set) for user in added_supervisors: _add_sso_role(user, instance.course.slug, "SUPERVISOR") elif action == "pre_remove": removed_supervisors = model.objects.filter(pk__in=pk_set) for user in removed_supervisors: _remove_sso_role(user, instance.course.slug, "SUPERVISOR") # LearningMentor @receiver(post_delete, sender=LearningMentor, dispatch_uid="delete_sso_roles_in_lm") def remove_sso_roles_in_lm(sender, instance: LearningMentor, **kwargs): if not LearningMentor.objects.filter( mentor=instance.mentor, course_session__course=instance.course_session.course ).exists(): _remove_sso_role( instance.mentor, instance.course_session.course.slug, "LEARNING_MENTOR" ) @receiver(pre_save, sender=LearningMentor, dispatch_uid="update_sso_roles_in_lm") def update_sso_roles_in_lm(sender, instance: LearningMentor, **kwargs): if not instance.pk: _add_sso_role( instance.mentor, instance.course_session.course.slug, "LEARNING_MENTOR" ) def _remove_sso_role(user: User, course_slug: str, role: str): try: logger.debug( f"Removing {role} role from user", user=user, course=course_slug, label="role_sync", ) remove_roles_from_user(user, [(course_slug, role)]) except KeycloakDeleteError: # fail silently, error object is being created in the service pass def _add_sso_role(user: User, course_slug: str, role: str): try: logger.debug( f"Adding {role} role to user", user=user, course=course_slug, label="role_sync", ) add_roles_to_user(user, [(course_slug, role)]) except KeycloakPostError: # fail silently, error object is being created in the service pass