vbv/server/vbv_lernwelt/sso/role_sync/services.py

178 lines
5.5 KiB
Python

import unicodedata
from typing import Dict, List, Tuple
import structlog
from django.conf import settings
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from keycloak.exceptions import KeycloakDeleteError, KeycloakPostError
from vbv_lernwelt.core.models import User
from vbv_lernwelt.sso.models import SsoSyncError
from vbv_lernwelt.sso.role_sync.roles import ROLE_IDS, SSO_ROLES
logger = structlog.get_logger(__name__)
CourseRolesType = List[Tuple[str, str]]
KeyCloakRolesType = List[Dict[str, str]]
keycloak_admin = None # Needed for pytest
if settings.OAUTH_SYNC_ROLES:
keycloak_connection = KeycloakOpenIDConnection(
server_url=settings.OAUTH_SIGNIN_URL,
realm_name=settings.OAUTH_SIGNIN_REALM,
user_realm_name=settings.OAUTH_SIGNIN_REALM,
client_id=settings.OAUTH_SIGNIN_ADMIN_CLIENT_ID,
client_secret_key=settings.OAUTH_SIGNIN_ADMIN_CLIENT_SECRET,
verify=True,
)
keycloak_admin = KeycloakAdmin(connection=keycloak_connection)
def add_roles_to_user(user: User, course_roles: CourseRolesType):
return _handle_add_remove_action(
user=user, course_roles=course_roles, action=SsoSyncError.Action.ADD
)
def remove_roles_from_user(user: User, course_roles: CourseRolesType):
return _handle_add_remove_action(
user=user, course_roles=course_roles, action=SsoSyncError.Action.REMOVE
)
def _handle_add_remove_action(
user: User,
course_roles: CourseRolesType,
action: SsoSyncError.Action,
):
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if keycloak_admin and user_id:
request_roles = _get_role_request_data(course_roles)
if not request_roles:
return False
if action == SsoSyncError.Action.ADD:
_kc_assign_realm_roles(user, user_id, request_roles)
elif action == SsoSyncError.Action.REMOVE:
_kc_delete_realm_roles(user, user_id, request_roles)
return True
return False
def update_roles_for_user(
user: User, add_course_roles: CourseRolesType, remove_course_roles: CourseRolesType
):
if keycloak_admin:
remove_ret_value = remove_roles_from_user(user, remove_course_roles)
add_ret_value = add_roles_to_user(user, add_course_roles)
return remove_ret_value and add_ret_value
return False
def sync_roles_for_user(user: User, course_roles: CourseRolesType):
if keycloak_admin:
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if user_id:
assigned_roles = _filter_non_myvbv_roles(
keycloak_admin.get_realm_roles_of_user(user_id=user_id)
)
if assigned_roles:
_kc_delete_realm_roles(user, user_id, assigned_roles)
roles = _get_role_request_data(course_roles)
keycloak_admin.assign_realm_roles(user_id=user_id, roles=roles)
return True
return False
def create_user(user: User):
if keycloak_admin:
return _kc_create_user(user)
return ""
def create_and_update_user(user: User, save=False):
sso_data = {"intermediate_sso_id": create_user(user)}
user.update_additional_json_data(sso_data)
if save:
user.save()
def get_roles_for_user(user_id: str):
if keycloak_admin:
return keycloak_admin.get_realm_roles_of_user(
user_id=user_id,
)
return []
# Keycloak wrappers
def _kc_assign_realm_roles(user: User, user_id: str, roles: List[KeyCloakRolesType]):
try:
keycloak_admin.assign_realm_roles(user_id=user_id, roles=roles)
except KeycloakPostError as e:
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.ADD, data=roles
)
raise e
def _kc_delete_realm_roles(user: User, user_id: str, roles: List[KeyCloakRolesType]):
try:
keycloak_admin.delete_realm_roles_of_user(user_id=user_id, roles=roles)
except KeycloakDeleteError as e:
SsoSyncError.objects.create(
user=user, action=SsoSyncError.Action.REMOVE, data=roles
)
raise e
def _kc_create_user(user: User) -> str:
user_data = {
"username": user.email,
"email": user.email,
"enabled": True,
"firstName": user.first_name,
"lastName": user.last_name,
}
try:
return keycloak_admin.create_user(user_data, exist_ok=True)
except KeycloakPostError as e:
SsoSyncError.objects.create(user=user, action=SsoSyncError.Action.ADD, data={})
raise e
def _get_role_request_data(course_roles: CourseRolesType) -> List[KeyCloakRolesType]:
request_roles = []
for item in course_roles:
course_slug, role = item
sanitized_course_slug = _remove_accents(course_slug)
try:
oauth_role = _create_role_name(sanitized_course_slug, role)
request_roles.append({"id": ROLE_IDS[oauth_role], "name": oauth_role})
except KeyError:
logger.warning(
"Role or course not found in SSO_ROLES",
course_slug=course_slug,
role=role,
label="role_sync",
)
return request_roles
def _create_role_name(course_slug: str, role: str) -> List[str]:
return SSO_ROLES[course_slug][role]
def _remove_accents(input_str) -> str:
nfkd_form = unicodedata.normalize("NFKD", input_str)
return "".join([char for char in nfkd_form if not unicodedata.combining(char)])
def _filter_non_myvbv_roles(roles: List[KeyCloakRolesType]) -> List[KeyCloakRolesType]:
return [role for role in roles if role["name"].startswith("myvbv-")]