wip: Add signals, change black version

This commit is contained in:
Christian Cueni 2024-06-19 15:34:14 +02:00
parent aa3f222112
commit 601cf7a12b
9 changed files with 83 additions and 31 deletions

View File

@ -13,7 +13,7 @@ from vbv_lernwelt.core.model_utils import find_available_slug
from vbv_lernwelt.core.models import User from vbv_lernwelt.core.models import User
from vbv_lernwelt.course.serializer_helpers import get_course_serializer_class from vbv_lernwelt.course.serializer_helpers import get_course_serializer_class
from vbv_lernwelt.files.models import UploadFile from vbv_lernwelt.files.models import UploadFile
from vbv_lernwelt.sso.role_sync.client import ( from vbv_lernwelt.sso.role_sync.services import (
add_roles_to_user, add_roles_to_user,
remove_roles_from_user, remove_roles_from_user,
update_roles_for_user, update_roles_for_user,
@ -312,6 +312,25 @@ class CourseSessionUser(models.Model):
) )
super().save(*args, **kwargs) super().save(*args, **kwargs)
@classmethod
def update_sso_roles(cls, instance: "CourseSessionUser"):
if instance.created_at is None:
add_roles_to_user(
instance.user, [(instance.course_session.course.slug, [instance.role])]
)
else:
old_csu = CourseSessionUser.objects.get(pk=instance.pk)
if old_csu.role != instance.role:
update_roles_for_user(
instance.user,
add_course_roles=[
(instance.course_session.course.slug, [instance.role])
],
remove_course_roles=[
(instance.course_session.course.slug, [old_csu.role])
],
)
@classmethod @classmethod
def remove_sso_roles_from_user(cls, instance: "CourseSessionUser"): def remove_sso_roles_from_user(cls, instance: "CourseSessionUser"):
remove_roles_from_user( remove_roles_from_user(

View File

@ -1,4 +1,4 @@
from django.db.models.signals import post_delete, post_save from django.db.models.signals import post_delete, post_save, pre_save
from django.dispatch import receiver from django.dispatch import receiver
from vbv_lernwelt.course.models import Course, CourseConfiguration, CourseSessionUser from vbv_lernwelt.course.models import Course, CourseConfiguration, CourseSessionUser
@ -10,6 +10,11 @@ def create_course_configuration(sender, instance, created, **kwargs):
CourseConfiguration.objects.create(course=instance) CourseConfiguration.objects.create(course=instance)
@receiver(post_delete, sender=CourseSessionUser) @receiver(post_delete, sender=CourseSessionUser, dispatch_uid="delete_sso_roles")
def after_delete(sender, instance, **kwargs): def delete_sso_roles(sender, instance, **kwargs):
CourseSessionUser.remove_sso_roles_from_user(instance) CourseSessionUser.remove_sso_roles_from_user(instance)
@receiver(pre_save, sender=CourseSessionUser, dispatch_uid="update_sso_roles")
def update_sso_roles(sender, instance: CourseSessionUser, **kwargs):
CourseSessionUser.update_sso_roles(instance)

View File

@ -4,4 +4,5 @@ from vbv_lernwelt.course_session_group.models import CourseSessionGroup
@admin.register(CourseSessionGroup) @admin.register(CourseSessionGroup)
class CourseSessionAssignmentAdmin(admin.ModelAdmin): ... class CourseSessionAssignmentAdmin(admin.ModelAdmin):
...

View File

@ -153,9 +153,9 @@ def fetch_course_session_all_users(courses: List[int], excluded_domains=None):
def generate_export_response(cs_users: List[User]) -> HttpResponse: def generate_export_response(cs_users: List[User]) -> HttpResponse:
response = HttpResponse(content_type="text/csv; charset=utf-8") response = HttpResponse(content_type="text/csv; charset=utf-8")
response["Content-Disposition"] = ( response[
f"attachment; filename=edoniq_user_export_{date.today().strftime('%Y%m%d')}.csv" "Content-Disposition"
) ] = f"attachment; filename=edoniq_user_export_{date.today().strftime('%Y%m%d')}.csv"
response.write("\ufeff".encode("utf8")) # UTF-8 BOM response.write("\ufeff".encode("utf8")) # UTF-8 BOM

View File

@ -125,9 +125,9 @@ def _handle_feedback_export_action(course_seesions, file_name):
response = HttpResponse( response = HttpResponse(
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
) )
response["Content-Disposition"] = ( response[
f"attachment; filename={make_export_filename(file_name)}" "Content-Disposition"
) ] = f"attachment; filename={make_export_filename(file_name)}"
response.write(excel_bytes) response.write(excel_bytes)
return response return response

View File

@ -25,6 +25,7 @@ from vbv_lernwelt.learnpath.models import (
LearningContentEdoniqTest, LearningContentEdoniqTest,
) )
from vbv_lernwelt.notify.models import NotificationCategory from vbv_lernwelt.notify.models import NotificationCategory
from vbv_lernwelt.sso.role_sync.services import create_user
logger = structlog.get_logger(__name__) logger = structlog.get_logger(__name__)
@ -538,9 +539,10 @@ def create_or_update_user(
user.first_name = first_name or user.first_name user.first_name = first_name or user.first_name
user.last_name = last_name or user.last_name user.last_name = last_name or user.last_name
user.username = email user.username = email
user.additional_json_data = user.additional_json_data | {
"intermediate_sso_id": intermediate_sso_id sso_data = {"intermediate_sso_id": intermediate_sso_id}
} update_user_json_data(user, sso_data)
user.set_unusable_password() user.set_unusable_password()
user.save() user.save()
@ -827,7 +829,7 @@ def create_or_update_trainer(course: Course, data: Dict[str, Any], language="de"
course_title = course.title if course else "None" course_title = course.title if course else "None"
logger.debug( logger.debug(
"create_or_update_trainer", "create_or_update_trainer2",
course=course_title, course=course_title,
data=data, data=data,
label="import", label="import",
@ -839,6 +841,11 @@ def create_or_update_trainer(course: Course, data: Dict[str, Any], language="de"
last_name=data["Name"], last_name=data["Name"],
) )
user.language = data["Sprache"] user.language = data["Sprache"]
# create user in intermediate sso i.e. Keycloak
sso_data = {"intermediate_sso_id": create_user(user)}
update_user_json_data(user, sso_data)
user.save() user.save()
group = data["Klasse"].strip() group = data["Klasse"].strip()
@ -942,6 +949,10 @@ def create_or_update_student(data: Dict[str, Any]):
update_user_json_data(user, data) update_user_json_data(user, data)
user.save() user.save()
# create user in intermediate sso i.e. Keycloak
sso_data = {"intermediate_sso_id": create_user(user)}
update_user_json_data(user, sso_data)
# general expert handling # general expert handling
import_id = data["Durchführungen"] import_id = data["Durchführungen"]
course_session = CourseSession.objects.filter(import_id=import_id).first() course_session = CourseSession.objects.filter(import_id=import_id).first()

View File

@ -65,9 +65,9 @@ class TestNotificationService(TestCase):
self.assertFalse(notification.emailed) self.assertFalse(notification.emailed)
def test_send_notification_with_email(self): def test_send_notification_with_email(self):
self.recipient.additional_json_data["email_notification_categories"] = ( self.recipient.additional_json_data[
json.dumps(["USER_INTERACTION"]) "email_notification_categories"
) ] = json.dumps(["USER_INTERACTION"])
self.recipient.save() self.recipient.save()
verb = "Anne hat deinen Auftrag bewertet" verb = "Anne hat deinen Auftrag bewertet"
@ -146,9 +146,9 @@ class TestNotificationService(TestCase):
self.assertFalse(notification.emailed) self.assertFalse(notification.emailed)
# when the email was not sent, yet it will still send it afterwards... # when the email was not sent, yet it will still send it afterwards...
self.recipient.additional_json_data["email_notification_categories"] = ( self.recipient.additional_json_data[
json.dumps(["USER_INTERACTION"]) "email_notification_categories"
) ] = json.dumps(["USER_INTERACTION"])
self.recipient.save() self.recipient.save()
result = self.notification_service._send_notification( result = self.notification_service._send_notification(
@ -188,9 +188,9 @@ class TestNotificationService(TestCase):
self.assertFalse(self._has_sent_emails()) self.assertFalse(self._has_sent_emails())
# Assert mail is sent if corresponding email notification type is enabled # Assert mail is sent if corresponding email notification type is enabled
self.recipient.additional_json_data["email_notification_categories"] = ( self.recipient.additional_json_data[
json.dumps(["USER_INTERACTION"]) "email_notification_categories"
) ] = json.dumps(["USER_INTERACTION"])
self.recipient.save() self.recipient.save()
self.notification_service._send_notification( self.notification_service._send_notification(
sender=self.sender, sender=self.sender,

View File

@ -39,9 +39,9 @@ class SelfEvaluationFeedbackSerializer(serializers.ModelSerializer):
return obj.learning_unit.get_circle().title return obj.learning_unit.get_circle().title
def get_criteria(self, obj): def get_criteria(self, obj):
performance_criteria: List[PerformanceCriteria] = ( performance_criteria: List[
obj.learning_unit.performancecriteria_set.all() PerformanceCriteria
) ] = obj.learning_unit.performancecriteria_set.all()
criteria = [] criteria = []

View File

@ -16,7 +16,7 @@ if settings.OAUTH_SYNC_ROLES:
user_realm_name=settings.OAUTH_SIGNIN_REALM, user_realm_name=settings.OAUTH_SIGNIN_REALM,
client_id=settings.OAUTH_SIGNIN_ADMIN_CLIENT_ID, client_id=settings.OAUTH_SIGNIN_ADMIN_CLIENT_ID,
client_secret_key=settings.OAUTH_SIGNIN_ADMIN_CLIENT_SECRET, client_secret_key=settings.OAUTH_SIGNIN_ADMIN_CLIENT_SECRET,
verify=True, verify=False,
) )
keycloak_admin = KeycloakAdmin(connection=keycloak_connection) keycloak_admin = KeycloakAdmin(connection=keycloak_connection)
@ -29,7 +29,7 @@ def add_roles_to_user(user: User, course_roles: CourseRolesType):
user_id = user.additional_json_data.get("intermediate_sso_id", "") user_id = user.additional_json_data.get("intermediate_sso_id", "")
if settings.OAUTH_SYNC_ROLES and user_id: if settings.OAUTH_SYNC_ROLES and user_id:
request_roles = _get_role_request_data(course_roles) request_roles = _get_role_request_data(course_roles)
keycloak_admin.assign_realm_roles( some = keycloak_admin.assign_realm_roles(
user_id=user_id, user_id=user_id,
roles=request_roles, roles=request_roles,
) )
@ -41,7 +41,7 @@ def remove_roles_from_user(user: User, course_roles: CourseRolesType):
user_id = user.additional_json_data.get("intermediate_sso_id", "") user_id = user.additional_json_data.get("intermediate_sso_id", "")
if settings.OAUTH_SYNC_ROLES and user_id: if settings.OAUTH_SYNC_ROLES and user_id:
request_roles = _get_role_request_data(course_roles) request_roles = _get_role_request_data(course_roles)
keycloak_admin.delete_realm_roles_of_user( some = keycloak_admin.delete_realm_roles_of_user(
user_id=user_id, user_id=user_id,
roles=request_roles, roles=request_roles,
) )
@ -53,12 +53,26 @@ def update_roles_for_user(
user: User, add_course_roles: CourseRolesType, remove_course_roles: CourseRolesType user: User, add_course_roles: CourseRolesType, remove_course_roles: CourseRolesType
): ):
if settings.OAUTH_SYNC_ROLES: if settings.OAUTH_SYNC_ROLES:
add_roles_to_user(user, add_course_roles)
remove_roles_from_user(user, remove_course_roles) remove_roles_from_user(user, remove_course_roles)
add_roles_to_user(user, add_course_roles)
return True return True
return False return False
def create_user(user: User):
if settings.OAUTH_SYNC_ROLES:
user_data = {
"username": user.email,
"email": user.email,
"enabled": True,
"firstName": user.first_name,
"lastName": user.last_name,
}
user_id = keycloak_admin.create_user(user_data, exist_ok=True)
return user_id
return ""
def get_roles_for_user(user_id: str): def get_roles_for_user(user_id: str):
if settings.OAUTH_SYNC_ROLES: if settings.OAUTH_SYNC_ROLES:
return keycloak_admin.get_realm_roles_of_user( return keycloak_admin.get_realm_roles_of_user(
@ -69,6 +83,8 @@ def get_roles_for_user(user_id: str):
# create sso-ID user and set roles # create sso-ID user and set roles
# sync # sync
# remove all, add all
# display
def _get_role_request_data(course_roles: CourseRolesType) -> List[Dict[str, str]]: def _get_role_request_data(course_roles: CourseRolesType) -> List[Dict[str, str]]: