wip: Add sync method

This commit is contained in:
Christian Cueni 2024-06-19 18:50:24 +02:00
parent 601cf7a12b
commit 13789a9619
6 changed files with 58 additions and 14 deletions

View File

@ -3,11 +3,18 @@ from django.contrib.auth import admin as auth_admin, get_user_model
from django.utils.translation import gettext_lazy as _
from vbv_lernwelt.core.models import Country, JobLog, Organisation
from vbv_lernwelt.core.signals import sync_sso_roles_signal
from vbv_lernwelt.core.utils import pretty_print_json
User = get_user_model()
@admin.action(description="KEYCLOAK: Sync SSO Roles")
def sync_sso_roles(modeladmin, request, queryset):
for user in queryset:
sync_sso_roles_signal.send(sender="core.admin", user=user)
class LogAdmin(admin.ModelAdmin):
def has_add_permission(self, request):
return False
@ -83,6 +90,7 @@ class UserAdmin(auth_admin.UserAdmin):
"sso_id",
]
search_fields = ["first_name", "last_name", "email", "username", "sso_id"]
actions = [sync_sso_roles]
@admin.register(JobLog)

View File

@ -0,0 +1,7 @@
from django.dispatch import Signal
sync_sso_roles_signal = Signal(
providing_args=[
"user",
]
)

View File

@ -16,6 +16,7 @@ from vbv_lernwelt.files.models import UploadFile
from vbv_lernwelt.sso.role_sync.services import (
add_roles_to_user,
remove_roles_from_user,
sync_roles_for_user,
update_roles_for_user,
)
@ -316,7 +317,7 @@ class CourseSessionUser(models.Model):
def update_sso_roles(cls, instance: "CourseSessionUser"):
if instance.created_at is None:
add_roles_to_user(
instance.user, [(instance.course_session.course.slug, [instance.role])]
instance.user, [(instance.course_session.course.slug, instance.role)]
)
else:
old_csu = CourseSessionUser.objects.get(pk=instance.pk)
@ -324,19 +325,27 @@ class CourseSessionUser(models.Model):
update_roles_for_user(
instance.user,
add_course_roles=[
(instance.course_session.course.slug, [instance.role])
(instance.course_session.course.slug, instance.role)
],
remove_course_roles=[
(instance.course_session.course.slug, [old_csu.role])
(instance.course_session.course.slug, old_csu.role)
],
)
@classmethod
def remove_sso_roles_from_user(cls, instance: "CourseSessionUser"):
remove_roles_from_user(
instance.user, [(instance.course_session.course.slug, [instance.role])]
instance.user, [(instance.course_session.course.slug, instance.role)]
)
@classmethod
def sync_sso_roles(cls, user: User):
course_roles = [
(csu.course_session.course.slug, csu.role)
for csu in CourseSessionUser.objects.filter(user=user)
]
sync_roles_for_user(user, course_roles)
class CircleDocument(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)

View File

@ -1,6 +1,7 @@
from django.db.models.signals import post_delete, post_save, pre_save
from django.dispatch import receiver
from vbv_lernwelt.core.signals import sync_sso_roles_signal
from vbv_lernwelt.course.models import Course, CourseConfiguration, CourseSessionUser
@ -18,3 +19,8 @@ def delete_sso_roles(sender, instance, **kwargs):
@receiver(pre_save, sender=CourseSessionUser, dispatch_uid="update_sso_roles")
def update_sso_roles(sender, instance: CourseSessionUser, **kwargs):
CourseSessionUser.update_sso_roles(instance)
@receiver(sync_sso_roles_signal, dispatch_uid="sync_sso_roles")
def sync_sso_roles(sender, user, **kwargs):
CourseSessionUser.sync_sso_roles(user)

View File

@ -7,7 +7,7 @@ from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from vbv_lernwelt.core.models import User
from vbv_lernwelt.sso.role_sync.roles import ROLE_IDS, SSO_ROLES
CourseRolesType = List[Tuple[str, List[str]]]
CourseRolesType = List[Tuple[str, str]]
if settings.OAUTH_SYNC_ROLES:
keycloak_connection = KeycloakOpenIDConnection(
@ -29,7 +29,7 @@ def add_roles_to_user(user: User, course_roles: CourseRolesType):
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if settings.OAUTH_SYNC_ROLES and user_id:
request_roles = _get_role_request_data(course_roles)
some = keycloak_admin.assign_realm_roles(
keycloak_admin.assign_realm_roles(
user_id=user_id,
roles=request_roles,
)
@ -41,7 +41,7 @@ def remove_roles_from_user(user: User, course_roles: CourseRolesType):
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if settings.OAUTH_SYNC_ROLES and user_id:
request_roles = _get_role_request_data(course_roles)
some = keycloak_admin.delete_realm_roles_of_user(
keycloak_admin.delete_realm_roles_of_user(
user_id=user_id,
roles=request_roles,
)
@ -59,6 +59,22 @@ def update_roles_for_user(
return False
def sync_roles_for_user(user: User, course_roles: CourseRolesType):
if settings.OAUTH_SYNC_ROLES:
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if user_id:
assigned_roles = keycloak_admin.get_realm_roles_of_user(user_id=user_id)
if assigned_roles:
keycloak_admin.delete_realm_roles_of_user(
user_id=user_id,
roles=assigned_roles,
)
roles = _get_role_request_data(course_roles)
keycloak_admin.assign_realm_roles(user_id=user_id, roles=roles)
return True
return False
def create_user(user: User):
if settings.OAUTH_SYNC_ROLES:
user_data = {
@ -90,17 +106,15 @@ def get_roles_for_user(user_id: str):
def _get_role_request_data(course_roles: CourseRolesType) -> List[Dict[str, str]]:
request_roles = []
for item in course_roles:
course_slug, roles = item
course_slug, role = item
sanitized_course_slug = _remove_accents(course_slug)
oauth_roles = _create_role_names(sanitized_course_slug, roles)
return request_roles + [
{"id": ROLE_IDS[role], "name": role} for role in oauth_roles
]
oauth_role = _create_role_name(sanitized_course_slug, role)
request_roles.append({"id": ROLE_IDS[oauth_role], "name": oauth_role})
return request_roles
def _create_role_names(course_slug: str, roles: list) -> List[str]:
return [SSO_ROLES[course_slug][role] for role in roles]
def _create_role_name(course_slug: str, role: str) -> List[str]:
return SSO_ROLES[course_slug][role]
def _remove_accents(input_str) -> str: