Add sso models, move all code to sso-app

This commit is contained in:
Christian Cueni 2024-06-24 13:04:58 +02:00
parent e436c5ddbd
commit 857c4a4742
6 changed files with 194 additions and 63 deletions

View File

@ -3,24 +3,11 @@ from django.contrib.auth import admin as auth_admin, get_user_model
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from vbv_lernwelt.core.models import Country, JobLog, Organisation from vbv_lernwelt.core.models import Country, JobLog, Organisation
from vbv_lernwelt.core.signals import create_sso_user_signal, sync_sso_roles_signal
from vbv_lernwelt.core.utils import pretty_print_json from vbv_lernwelt.core.utils import pretty_print_json
User = get_user_model() User = get_user_model()
@admin.action(description="KEYCLOAK: Sync SSO Roles")
def sync_sso_roles(modeladmin, request, queryset):
for user in queryset:
sync_sso_roles_signal.send(sender="core.admin", user=user)
@admin.action(description="KEYCLOAK: Create User")
def create_sso_user(modeladmin, request, queryset):
for user in queryset:
create_sso_user_signal.send(sender="core.admin", user=user)
class LogAdmin(admin.ModelAdmin): class LogAdmin(admin.ModelAdmin):
def has_add_permission(self, request): def has_add_permission(self, request):
return False return False
@ -96,7 +83,6 @@ class UserAdmin(auth_admin.UserAdmin):
"sso_id", "sso_id",
] ]
search_fields = ["first_name", "last_name", "email", "username", "sso_id"] search_fields = ["first_name", "last_name", "email", "username", "sso_id"]
actions = [sync_sso_roles, create_sso_user]
@admin.register(JobLog) @admin.register(JobLog)

View File

@ -1,13 +0,0 @@
from django.dispatch import Signal
sync_sso_roles_signal = Signal(
providing_args=[
"user",
]
)
create_sso_user_signal = Signal(
providing_args=[
"user",
]
)

View File

@ -1,3 +1,138 @@
from django.contrib import admin from django.contrib import admin, messages
from django.contrib.auth import admin as auth_admin, get_user_model
from django.utils.translation import gettext_lazy as _
# Register your models here. from vbv_lernwelt.course.models import CourseSessionUser
from vbv_lernwelt.importer.services import update_user_json_data
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
from vbv_lernwelt.sso.models import SsoSyncError, SsoUser
from vbv_lernwelt.sso.role_sync.services import create_user, sync_roles_for_user
User = get_user_model()
def create_sso_user_from_admin(user: User, request):
try:
sso_data = {"intermediate_sso_id": create_user(user)}
update_user_json_data(user, sso_data)
user.save()
messages.add_message(
request, messages.SUCCESS, f"Der Bentuzer wurde in Keycloak erstellt."
)
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.CREATE, data=additional_data
)
messages.add_message(
request,
messages.WARNING,
f"Der Benutzer ({e}) konnte nicht in Keycloak erstellt werden.",
)
def sync_sso_roles_from_admin(user: User, request):
course_roles = [
(csu.course_session.course.slug, csu.role)
for csu in CourseSessionUser.objects.filter(user=user)
]
try:
sync_roles_for_user(user, course_roles)
messages.add_message(
request, messages.SUCCESS, f"Die Daten wurden mit Keycloak synchronisiert."
)
except MyVbvKeycloakDeleteError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.REMOVE, data=additional_data
)
messages.add_message(
request,
messages.WARNING,
f"Die bestehenden Rollen für Benutzer ({e}) konnten in Keycloak nicht gelöscht werden.",
)
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.ADD, data=additional_data
)
messages.add_message(
request,
messages.WARNING,
f"Die neuen Rollen für Benutzer ({e}) konnten in Keycloak nicht erstellt werden.",
)
@admin.action(description="KEYCLOAK: Sync SSO Roles")
def sync_sso_roles(modeladmin, request, queryset):
for user in queryset:
sync_sso_roles_from_admin(user, request)
@admin.action(description="KEYCLOAK: Create User")
def create_sso_user(modeladmin, request, queryset):
for user in queryset:
create_sso_user_from_admin(user, request)
@admin.register(SsoUser)
class SsoUserAdmin(auth_admin.UserAdmin):
fieldsets = (
(
_("Personal info"),
{"fields": ("first_name", "last_name", "email", "sso_id")},
),
(_("Additional data"), {"fields": ("additional_json_data",)}),
)
list_display = [
"username",
"first_name",
"last_name",
"sso_id",
"intermedia_sso_id",
]
search_fields = ["first_name", "last_name", "email", "username", "sso_id"]
actions = [sync_sso_roles, create_sso_user]
# Make fields read-only
readonly_fields = (
"username",
"password",
"first_name",
"last_name",
"email",
"additional_json_data",
)
# Disable delete action
def has_delete_permission(self, request, obj=None):
return False
def get_actions(self, request):
actions = super().get_actions(request)
if "delete_selected" in actions:
del actions["delete_selected"]
return actions
def intermedia_sso_id(self, obj):
return obj.additional_json_data.get("intermediate_sso_id", "")
intermedia_sso_id.short_description = "Keycloak SSO ID"
@admin.register(SsoSyncError)
class SsoSyncErrorAdmin(admin.ModelAdmin):
list_display = [
"created_at",
"user",
"action",
"data",
]
raw_id_fields = [
"user",
]
search_fields = [
"user.email",
"user.username",
]
list_filter = ("action",)

View File

@ -0,0 +1,52 @@
# Generated by Django 3.2.25 on 2024-06-20 05:24
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="SsoSyncError",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
(
"action",
models.CharField(
choices=[
("ADD", "Add"),
("REMOVE", "Remove"),
("CREATE", "Create"),
],
default="ADD",
max_length=255,
),
),
("data", models.JSONField(blank=True, default=dict)),
(
"user",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
),
),
],
),
]

View File

@ -3,6 +3,11 @@ from django.db import models
from vbv_lernwelt.core.models import User from vbv_lernwelt.core.models import User
class SsoUser(User):
class Meta:
proxy = True
class SsoSyncError(models.Model): class SsoSyncError(models.Model):
created_at = models.DateTimeField(auto_now_add=True) created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True) updated_at = models.DateTimeField(auto_now=True)

View File

@ -1,15 +1,12 @@
from django.db.models.signals import post_delete, pre_save from django.db.models.signals import post_delete, pre_save
from django.dispatch import receiver from django.dispatch import receiver
from vbv_lernwelt.core.signals import create_sso_user_signal, sync_sso_roles_signal
from vbv_lernwelt.course.models import CourseSessionUser from vbv_lernwelt.course.models import CourseSessionUser
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
from vbv_lernwelt.sso.models import SsoSyncError from vbv_lernwelt.sso.models import SsoSyncError
from vbv_lernwelt.sso.role_sync.services import ( from vbv_lernwelt.sso.role_sync.services import (
add_roles_to_user, add_roles_to_user,
create_user,
remove_roles_from_user, remove_roles_from_user,
sync_roles_for_user,
update_roles_for_user, update_roles_for_user,
) )
@ -56,34 +53,3 @@ def update_sso_roles(sender, instance: CourseSessionUser, **kwargs):
SsoSyncError.objects.create( SsoSyncError.objects.create(
user=instance.user, action=SsoSyncError.Action.ADD, data=additional_data user=instance.user, action=SsoSyncError.Action.ADD, data=additional_data
) )
@receiver(sync_sso_roles_signal, dispatch_uid="sync_sso_roles")
def sync_sso_roles(sender, user, **kwargs):
course_roles = [
(csu.course_session.course.slug, csu.role)
for csu in CourseSessionUser.objects.filter(user=user)
]
try:
sync_roles_for_user(user, course_roles)
except MyVbvKeycloakDeleteError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.REMOVE, data=additional_data
)
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.ADD, data=additional_data
)
@receiver(create_sso_user_signal, dispatch_uid="create_sso_user")
def create_sso_user(sender, user, **kwargs):
try:
create_user(user)
except MyVbvKeycloakPostError as e:
additional_data = getattr(e, "additional_data", {})
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.CREATE, data=additional_data
)