vbv/server/vbv_lernwelt/sso/signals.py

90 lines
3.4 KiB
Python

from django.db.models.signals import post_delete, pre_save
from django.dispatch import receiver
from vbv_lernwelt.core.signals import create_sso_user_signal, sync_sso_roles_signal
from vbv_lernwelt.course.models import CourseSessionUser
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
from vbv_lernwelt.sso.models import SsoSyncError
from vbv_lernwelt.sso.role_sync.services import (
add_roles_to_user,
create_user,
remove_roles_from_user,
sync_roles_for_user,
update_roles_for_user,
)
@receiver(post_delete, sender=CourseSessionUser, dispatch_uid="delete_sso_roles")
def delete_sso_roles(sender, instance, **kwargs):
try:
remove_roles_from_user(
instance.user, [(instance.course_session.course.slug, instance.role)]
)
except MyVbvKeycloakDeleteError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=instance.user, action=SsoSyncError.Action.REMOVE, data=additional_data
)
@receiver(pre_save, sender=CourseSessionUser, dispatch_uid="update_sso_roles")
def update_sso_roles(sender, instance: CourseSessionUser, **kwargs):
try:
if instance.created_at is None:
add_roles_to_user(
instance.user, [(instance.course_session.course.slug, instance.role)]
)
else:
old_csu = CourseSessionUser.objects.get(pk=instance.pk)
if old_csu.role != instance.role:
update_roles_for_user(
instance.user,
add_course_roles=[
(instance.course_session.course.slug, instance.role)
],
remove_course_roles=[
(instance.course_session.course.slug, old_csu.role)
],
)
except MyVbvKeycloakDeleteError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=instance.user, action=SsoSyncError.Action.REMOVE, data=additional_data
)
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=instance.user, action=SsoSyncError.Action.ADD, data=additional_data
)
@receiver(sync_sso_roles_signal, dispatch_uid="sync_sso_roles")
def sync_sso_roles(sender, user, **kwargs):
course_roles = [
(csu.course_session.course.slug, csu.role)
for csu in CourseSessionUser.objects.filter(user=user)
]
try:
sync_roles_for_user(user, course_roles)
except MyVbvKeycloakDeleteError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.REMOVE, data=additional_data
)
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.ADD, data=additional_data
)
@receiver(create_sso_user_signal, dispatch_uid="create_sso_user")
def create_sso_user(sender, user, **kwargs):
try:
create_user(user)
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.CREATE, data=additional_data
)