vbv/server/vbv_lernwelt/sso/role_sync/client.py

93 lines
2.8 KiB
Python

import unicodedata
from typing import Dict, List, Tuple
from django.conf import settings
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from vbv_lernwelt.core.models import User
from vbv_lernwelt.sso.role_sync.roles import ROLE_IDS, SSO_ROLES
CourseRolesType = List[Tuple[str, List[str]]]
if settings.OAUTH_SYNC_ROLES:
keycloak_connection = KeycloakOpenIDConnection(
server_url=settings.OAUTH_SIGNIN_URL,
realm_name=settings.OAUTH_SIGNIN_REALM,
user_realm_name=settings.OAUTH_SIGNIN_REALM,
client_id=settings.OAUTH_SIGNIN_ADMIN_CLIENT_ID,
client_secret_key=settings.OAUTH_SIGNIN_ADMIN_CLIENT_SECRET,
verify=True,
)
keycloak_admin = KeycloakAdmin(connection=keycloak_connection)
# todo: handle errors
def add_roles_to_user(user: User, course_roles: CourseRolesType):
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if settings.OAUTH_SYNC_ROLES and user_id:
request_roles = _get_role_request_data(course_roles)
keycloak_admin.assign_realm_roles(
user_id=user_id,
roles=request_roles,
)
return True
return False
def remove_roles_from_user(user: User, course_roles: CourseRolesType):
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if settings.OAUTH_SYNC_ROLES and user_id:
request_roles = _get_role_request_data(course_roles)
keycloak_admin.delete_realm_roles_of_user(
user_id=user_id,
roles=request_roles,
)
return True
return False
def update_roles_for_user(
user: User, add_course_roles: CourseRolesType, remove_course_roles: CourseRolesType
):
if settings.OAUTH_SYNC_ROLES:
add_roles_to_user(user, add_course_roles)
remove_roles_from_user(user, remove_course_roles)
return True
return False
def get_roles_for_user(user_id: str):
if settings.OAUTH_SYNC_ROLES:
return keycloak_admin.get_realm_roles_of_user(
user_id=user_id,
)
return []
# create sso-ID user and set roles
# sync
def _get_role_request_data(course_roles: CourseRolesType) -> List[Dict[str, str]]:
request_roles = []
for item in course_roles:
course_slug, roles = item
sanitized_course_slug = _remove_accents(course_slug)
oauth_roles = _create_role_names(sanitized_course_slug, roles)
return request_roles + [
{"id": ROLE_IDS[role], "name": role} for role in oauth_roles
]
return request_roles
def _create_role_names(course_slug: str, roles: list) -> List[str]:
return [SSO_ROLES[course_slug][role] for role in roles]
def _remove_accents(input_str) -> str:
nfkd_form = unicodedata.normalize("NFKD", input_str)
return "".join([char for char in nfkd_form if not unicodedata.combining(char)])