from django.contrib import admin, messages from django.contrib.auth import admin as auth_admin from django.contrib.auth import get_user_model from django.utils.translation import gettext_lazy as _ from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError from vbv_lernwelt.course.models import CourseSessionUser from vbv_lernwelt.course_session_group.models import CourseSessionGroup from vbv_lernwelt.learning_mentor.models import ( AgentParticipantRelation, AgentParticipantRoleType, ) from vbv_lernwelt.sso.models import SsoSyncError, SsoUser from vbv_lernwelt.sso.role_sync.services import ( create_and_update_user, sync_roles_for_user, ) User = get_user_model() def create_sso_user_from_admin(user: User, request): try: create_and_update_user(user) # noqa user.save() if request: messages.add_message( request, messages.SUCCESS, "Der Bentuzer wurde in Keycloak erstellt." ) except KeycloakPostError as e: if request: messages.add_message( request, messages.WARNING, f"Der Benutzer {user} konnte nicht in Keycloak erstellt werden: {e}", ) def sync_sso_roles_from_admin(user: User, request): course_roles = { (csu.course_session.course.slug, csu.role) for csu in CourseSessionUser.objects.filter(user=user) } course_roles.update( (relation.participant.course_session.course.slug, "LEARNING_MENTOR") for relation in AgentParticipantRelation.objects.filter( agent=user, role=AgentParticipantRoleType.LEARNING_MENTOR.value ) ) for csg in CourseSessionGroup.objects.filter(supervisor=user): for course_session in csg.course_session.all(): course_roles.add((course_session.course.slug, "SUPERVISOR")) try: sync_roles_for_user(user, course_roles) if request: messages.add_message( request, messages.SUCCESS, "Die Daten wurden mit Keycloak synchronisiert.", ) except KeycloakDeleteError as e: if request: messages.add_message( request, messages.WARNING, f"Die bestehenden Rollen für Benutzer ({user}) konnten in Keycloak nicht gelöscht werden: {e}", ) except KeycloakPostError as e: if request: messages.add_message( request, messages.WARNING, f"Die neuen Rollen für Benutzer ({user}) konnten in Keycloak nicht erstellt werden: {e}", ) @admin.action(description="KEYCLOAK: Sync SSO Roles") def sync_sso_roles(modeladmin, request, queryset): for user in queryset: sync_sso_roles_from_admin(user, request) @admin.action(description="KEYCLOAK: Create User") def create_sso_user(modeladmin, request, queryset): for user in queryset: create_sso_user_from_admin(user, request) @admin.register(SsoUser) class SsoUserAdmin(auth_admin.UserAdmin): fieldsets = ( ( _("Personal info"), {"fields": ("first_name", "last_name", "email", "sso_id")}, ), (_("Additional data"), {"fields": ("additional_json_data",)}), ) list_display = [ "username", "first_name", "last_name", "sso_id", "intermedia_sso_id", ] search_fields = [ "first_name", "last_name", "email", "username", "sso_id", "additional_json_data__intermediate_sso_id", ] actions = [sync_sso_roles, create_sso_user] # Make fields read-only readonly_fields = ( "username", "password", "first_name", "last_name", "email", "additional_json_data", ) # Disable delete action def has_delete_permission(self, request, obj=None): return False def get_actions(self, request): actions = super().get_actions(request) if "delete_selected" in actions: del actions["delete_selected"] return actions def intermedia_sso_id(self, obj): return obj.additional_json_data.get("intermediate_sso_id", "") intermedia_sso_id.short_description = "Keycloak SSO ID" @admin.register(SsoSyncError) class SsoSyncErrorAdmin(admin.ModelAdmin): list_display = [ "created_at", "user", "action", "data", ] raw_id_fields = [ "user", ] search_fields = [ "user.email", "user.username", ] list_filter = ("action",)