import unicodedata from typing import Dict, List, Tuple import structlog from django.conf import settings from keycloak import KeycloakAdmin, KeycloakOpenIDConnection from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError from vbv_lernwelt.core.models import User from vbv_lernwelt.sso.models import SsoSyncError from vbv_lernwelt.sso.role_sync.roles import ROLE_IDS, SSO_ROLES logger = structlog.get_logger(__name__) CourseRolesType = List[Tuple[str, str]] KeyCloakRolesType = List[Dict[str, str]] keycloak_admin = None # Needed for pytest if settings.OAUTH_SYNC_ROLES: keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OAUTH_SIGNIN_URL, realm_name=settings.OAUTH_SIGNIN_REALM, user_realm_name=settings.OAUTH_SIGNIN_REALM, client_id=settings.OAUTH_SIGNIN_ADMIN_CLIENT_ID, client_secret_key=settings.OAUTH_SIGNIN_ADMIN_CLIENT_SECRET, verify=True, ) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) def add_roles_to_user(user: User, course_roles: CourseRolesType): return _handle_add_remove_action( user=user, course_roles=course_roles, action=SsoSyncError.Action.ADD ) def remove_roles_from_user(user: User, course_roles: CourseRolesType): return _handle_add_remove_action( user=user, course_roles=course_roles, action=SsoSyncError.Action.REMOVE ) def _handle_add_remove_action( user: User, course_roles: CourseRolesType, action: SsoSyncError.Action, ): user_id = user.additional_json_data.get("intermediate_sso_id", "") if settings.OAUTH_SYNC_ROLES and user_id: request_roles = _get_role_request_data(course_roles) if not request_roles: return False if action == SsoSyncError.Action.ADD: _kc_assign_realm_roles(user, user_id, request_roles) elif action == SsoSyncError.Action.REMOVE: _kc_delete_realm_roles(user, user_id, request_roles) return True return False def update_roles_for_user( user: User, add_course_roles: CourseRolesType, remove_course_roles: CourseRolesType ): if settings.OAUTH_SYNC_ROLES: remove_ret_value = remove_roles_from_user(user, remove_course_roles) add_ret_value = add_roles_to_user(user, add_course_roles) return remove_ret_value and add_ret_value return False def sync_roles_for_user(user: User, course_roles: CourseRolesType): if settings.OAUTH_SYNC_ROLES: user_id = user.additional_json_data.get("intermediate_sso_id", "") if user_id: assigned_roles = _filter_non_myvbv_roles( keycloak_admin.get_realm_roles_of_user(user_id=user_id) ) if assigned_roles: _kc_delete_realm_roles(user, user_id, assigned_roles) roles = _get_role_request_data(course_roles) keycloak_admin.assign_realm_roles(user_id=user_id, roles=roles) return True return False def create_user(user: User): if keycloak_admin: return _kc_create_user(user) return "" def create_and_update_user(user: User, save=False): sso_data = {"intermediate_sso_id": create_user(user)} user.update_additional_json_data(sso_data) if save: user.save() def get_roles_for_user(user_id: str): if keycloak_admin: return keycloak_admin.get_realm_roles_of_user( user_id=user_id, ) return [] # Keycloak wrappers def _kc_assign_realm_roles(user: User, user_id: str, roles: List[KeyCloakRolesType]): try: keycloak_admin.assign_realm_roles(user_id=user_id, roles=roles) except KeycloakPostError as e: SsoSyncError.objects.create( user=user, action=SsoSyncError.Action.ADD, data=roles ) raise e def _kc_delete_realm_roles(user: User, user_id: str, roles: List[KeyCloakRolesType]): try: keycloak_admin.delete_realm_roles_of_user(user_id=user_id, roles=roles) except KeycloakDeleteError as e: SsoSyncError.objects.create( user=user, action=SsoSyncError.Action.REMOVE, data=roles ) raise e def _kc_create_user(user: User) -> str: user_data = { "username": user.email, "email": user.email, "enabled": True, "firstName": user.first_name, "lastName": user.last_name, } try: return keycloak_admin.create_user(user_data, exist_ok=True) except KeycloakPostError as e: SsoSyncError.objects.create(user=user, action=SsoSyncError.Action.ADD, data={}) raise e def _get_role_request_data(course_roles: CourseRolesType) -> List[KeyCloakRolesType]: request_roles = [] for item in course_roles: course_slug, role = item sanitized_course_slug = _remove_accents(course_slug) try: oauth_role = _create_role_name(sanitized_course_slug, role) request_roles.append({"id": ROLE_IDS[oauth_role], "name": oauth_role}) except KeyError: logger.warning( "Role or course not found in SSO_ROLES", course_slug=course_slug, role=role, label="role_sync", ) return request_roles def _create_role_name(course_slug: str, role: str) -> List[str]: return SSO_ROLES[course_slug][role] def _remove_accents(input_str) -> str: nfkd_form = unicodedata.normalize("NFKD", input_str) return "".join([char for char in nfkd_form if not unicodedata.combining(char)]) def _filter_non_myvbv_roles(roles: List[KeyCloakRolesType]) -> List[KeyCloakRolesType]: return [role for role in roles if role["name"].startswith("myvbv-")]