wip: Add error model, move code, add exception

This commit is contained in:
Christian Cueni 2024-06-19 19:38:12 +02:00
parent 13789a9619
commit 9437dafb76
8 changed files with 179 additions and 93 deletions

View File

@ -3,7 +3,7 @@ from django.contrib.auth import admin as auth_admin, get_user_model
from django.utils.translation import gettext_lazy as _
from vbv_lernwelt.core.models import Country, JobLog, Organisation
from vbv_lernwelt.core.signals import sync_sso_roles_signal
from vbv_lernwelt.core.signals import create_sso_user_signal, sync_sso_roles_signal
from vbv_lernwelt.core.utils import pretty_print_json
User = get_user_model()
@ -15,6 +15,12 @@ def sync_sso_roles(modeladmin, request, queryset):
sync_sso_roles_signal.send(sender="core.admin", user=user)
@admin.action(description="KEYCLOAK: Create User")
def create_sso_user(modeladmin, request, queryset):
for user in queryset:
create_sso_user_signal.send(sender="core.admin", user=user)
class LogAdmin(admin.ModelAdmin):
def has_add_permission(self, request):
return False
@ -90,7 +96,7 @@ class UserAdmin(auth_admin.UserAdmin):
"sso_id",
]
search_fields = ["first_name", "last_name", "email", "username", "sso_id"]
actions = [sync_sso_roles]
actions = [sync_sso_roles, create_sso_user]
@admin.register(JobLog)

View File

@ -5,3 +5,9 @@ sync_sso_roles_signal = Signal(
"user",
]
)
create_sso_user_signal = Signal(
providing_args=[
"user",
]
)

View File

@ -13,12 +13,6 @@ from vbv_lernwelt.core.model_utils import find_available_slug
from vbv_lernwelt.core.models import User
from vbv_lernwelt.course.serializer_helpers import get_course_serializer_class
from vbv_lernwelt.files.models import UploadFile
from vbv_lernwelt.sso.role_sync.services import (
add_roles_to_user,
remove_roles_from_user,
sync_roles_for_user,
update_roles_for_user,
)
class CircleContactType(Enum):
@ -198,6 +192,12 @@ class CourseCompletionStatus(Enum):
UNKNOWN = "UNKNOWN"
class CourseCompletionStatusChoices(models.TextChoices):
SUCCESS = CourseCompletionStatus.SUCCESS.value, "Success"
FAIL = CourseCompletionStatus.FAIL.value, "Fail"
UNKNOWN = CourseCompletionStatus.UNKNOWN.value, "Unknown"
class CourseCompletion(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
@ -216,8 +216,8 @@ class CourseCompletion(models.Model):
completion_status = models.CharField(
max_length=255,
choices=[(status, status.value) for status in CourseCompletionStatus],
default=CourseCompletionStatus.UNKNOWN.value,
choices=CourseCompletionStatusChoices.choices,
default=CourseCompletionStatus.UNKNOWN,
)
additional_json_data = models.JSONField(default=dict, blank=True)
@ -299,53 +299,6 @@ class CourseSessionUser(models.Model):
def __str__(self):
return f"{self.user} ({self.course_session.title})"
def save(self, *args, **kwargs):
if self.created_at is None:
add_roles_to_user(
self.user, [(self.course_session.course.slug, [self.role])]
)
else:
old_csu = CourseSessionUser.objects.get(pk=self.pk)
update_roles_for_user(
self.user,
add_course_roles=[(self.course_session.course.slug, [self.role])],
remove_course_roles=[(self.course_session.course.slug, [old_csu.role])],
)
super().save(*args, **kwargs)
@classmethod
def update_sso_roles(cls, instance: "CourseSessionUser"):
if instance.created_at is None:
add_roles_to_user(
instance.user, [(instance.course_session.course.slug, instance.role)]
)
else:
old_csu = CourseSessionUser.objects.get(pk=instance.pk)
if old_csu.role != instance.role:
update_roles_for_user(
instance.user,
add_course_roles=[
(instance.course_session.course.slug, instance.role)
],
remove_course_roles=[
(instance.course_session.course.slug, old_csu.role)
],
)
@classmethod
def remove_sso_roles_from_user(cls, instance: "CourseSessionUser"):
remove_roles_from_user(
instance.user, [(instance.course_session.course.slug, instance.role)]
)
@classmethod
def sync_sso_roles(cls, user: User):
course_roles = [
(csu.course_session.course.slug, csu.role)
for csu in CourseSessionUser.objects.filter(user=user)
]
sync_roles_for_user(user, course_roles)
class CircleDocument(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)

View File

@ -1,26 +1,10 @@
from django.db.models.signals import post_delete, post_save, pre_save
from django.db.models.signals import post_save
from django.dispatch import receiver
from vbv_lernwelt.core.signals import sync_sso_roles_signal
from vbv_lernwelt.course.models import Course, CourseConfiguration, CourseSessionUser
from vbv_lernwelt.course.models import Course, CourseConfiguration
@receiver(post_save, sender=Course)
def create_course_configuration(sender, instance, created, **kwargs):
if created:
CourseConfiguration.objects.create(course=instance)
@receiver(post_delete, sender=CourseSessionUser, dispatch_uid="delete_sso_roles")
def delete_sso_roles(sender, instance, **kwargs):
CourseSessionUser.remove_sso_roles_from_user(instance)
@receiver(pre_save, sender=CourseSessionUser, dispatch_uid="update_sso_roles")
def update_sso_roles(sender, instance: CourseSessionUser, **kwargs):
CourseSessionUser.update_sso_roles(instance)
@receiver(sync_sso_roles_signal, dispatch_uid="sync_sso_roles")
def sync_sso_roles(sender, user, **kwargs):
CourseSessionUser.sync_sso_roles(user)

View File

@ -0,0 +1,23 @@
from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError
class MyVbvKeycloakDeleteError(KeycloakDeleteError):
def __init__(
self, keycloak_error: KeycloakDeleteError, additional_data: list | dict
):
super().__init__(
keycloak_error.error_message,
keycloak_error.response_code,
keycloak_error.response_body,
)
self.additional_data = additional_data
class MyVbvKeycloakPostError(KeycloakPostError):
def __init__(self, keycloak_error: KeycloakPostError, additional_data: list | dict):
super().__init__(
keycloak_error.error_message,
keycloak_error.response_code,
keycloak_error.response_body,
)
self.additional_data = additional_data

View File

@ -0,0 +1,23 @@
from django.db import models
from vbv_lernwelt.core.models import User
class SsoSyncError(models.Model):
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
user = models.ForeignKey(User, on_delete=models.CASCADE)
class Action(models.TextChoices):
ADD = "ADD", "Add"
REMOVE = "REMOVE", "Remove"
CREATE = "CREATE", "Create"
action = models.CharField(
choices=Action.choices, max_length=255, default=Action.ADD
)
data = models.JSONField(default=dict, blank=True)
def __str__(self):
return f"{self.user} ({self.action})"

View File

@ -3,8 +3,10 @@ from typing import Dict, List, Tuple
from django.conf import settings
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError
from vbv_lernwelt.core.models import User
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
from vbv_lernwelt.sso.role_sync.roles import ROLE_IDS, SSO_ROLES
CourseRolesType = List[Tuple[str, str]]
@ -22,17 +24,17 @@ if settings.OAUTH_SYNC_ROLES:
keycloak_admin = KeycloakAdmin(connection=keycloak_connection)
# todo: handle errors
def add_roles_to_user(user: User, course_roles: CourseRolesType):
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if settings.OAUTH_SYNC_ROLES and user_id:
request_roles = _get_role_request_data(course_roles)
keycloak_admin.assign_realm_roles(
user_id=user_id,
roles=request_roles,
)
try:
keycloak_admin.assign_realm_roles(
user_id=user_id,
roles=request_roles,
)
except KeycloakPostError as e:
raise MyVbvKeycloakPostError(e, request_roles)
return True
return False
@ -41,10 +43,13 @@ def remove_roles_from_user(user: User, course_roles: CourseRolesType):
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if settings.OAUTH_SYNC_ROLES and user_id:
request_roles = _get_role_request_data(course_roles)
keycloak_admin.delete_realm_roles_of_user(
user_id=user_id,
roles=request_roles,
)
try:
keycloak_admin.delete_realm_roles_of_user(
user_id=user_id,
roles=request_roles,
)
except KeycloakDeleteError as e:
raise MyVbvKeycloakDeleteError(e, request_roles)
return True
return False
@ -84,7 +89,10 @@ def create_user(user: User):
"firstName": user.first_name,
"lastName": user.last_name,
}
user_id = keycloak_admin.create_user(user_data, exist_ok=True)
try:
user_id = keycloak_admin.create_user(user_data, exist_ok=True)
except KeycloakPostError as e:
raise MyVbvKeycloakPostError(e, user_data)
return user_id
return ""
@ -97,12 +105,6 @@ def get_roles_for_user(user_id: str):
return []
# create sso-ID user and set roles
# sync
# remove all, add all
# display
def _get_role_request_data(course_roles: CourseRolesType) -> List[Dict[str, str]]:
request_roles = []
for item in course_roles:

View File

@ -0,0 +1,89 @@
from django.db.models.signals import post_delete, pre_save
from django.dispatch import receiver
from vbv_lernwelt.core.signals import create_sso_user_signal, sync_sso_roles_signal
from vbv_lernwelt.course.models import CourseSessionUser
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
from vbv_lernwelt.sso.models import SsoSyncError
from vbv_lernwelt.sso.role_sync.services import (
add_roles_to_user,
create_user,
remove_roles_from_user,
sync_roles_for_user,
update_roles_for_user,
)
@receiver(post_delete, sender=CourseSessionUser, dispatch_uid="delete_sso_roles")
def delete_sso_roles(sender, instance, **kwargs):
try:
remove_roles_from_user(
instance.user, [(instance.course_session.course.slug, instance.role)]
)
except MyVbvKeycloakDeleteError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=instance.user, action=SsoSyncError.Action.REMOVE, data=additional_data
)
@receiver(pre_save, sender=CourseSessionUser, dispatch_uid="update_sso_roles")
def update_sso_roles(sender, instance: CourseSessionUser, **kwargs):
try:
if instance.created_at is None:
add_roles_to_user(
instance.user, [(instance.course_session.course.slug, instance.role)]
)
else:
old_csu = CourseSessionUser.objects.get(pk=instance.pk)
if old_csu.role != instance.role:
update_roles_for_user(
instance.user,
add_course_roles=[
(instance.course_session.course.slug, instance.role)
],
remove_course_roles=[
(instance.course_session.course.slug, old_csu.role)
],
)
except MyVbvKeycloakDeleteError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=instance.user, action=SsoSyncError.Action.REMOVE, data=additional_data
)
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=instance.user, action=SsoSyncError.Action.ADD, data=additional_data
)
@receiver(sync_sso_roles_signal, dispatch_uid="sync_sso_roles")
def sync_sso_roles(sender, user, **kwargs):
course_roles = [
(csu.course_session.course.slug, csu.role)
for csu in CourseSessionUser.objects.filter(user=user)
]
try:
sync_roles_for_user(user, course_roles)
except MyVbvKeycloakDeleteError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.REMOVE, data=additional_data
)
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.ADD, data=additional_data
)
@receiver(create_sso_user_signal, dispatch_uid="create_sso_user")
def create_sso_user(sender, user, **kwargs):
try:
create_user(user)
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.CREATE, data=additional_data
)