vbv/server/vbv_lernwelt/sso/role_sync/services.py

154 lines
4.9 KiB
Python

import unicodedata
from typing import Dict, List, Tuple
import structlog
from django.conf import settings
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from keycloak.exceptions import KeycloakDeleteError, KeycloakError, KeycloakPostError
from vbv_lernwelt.core.models import User
from vbv_lernwelt.sso.exceptions import MyVbvKeycloakDeleteError, MyVbvKeycloakPostError
from vbv_lernwelt.sso.role_sync.roles import ROLE_IDS, SSO_ROLES
logger = structlog.get_logger(__name__)
CourseRolesType = List[Tuple[str, str]]
if settings.OAUTH_SYNC_ROLES:
keycloak_connection = KeycloakOpenIDConnection(
server_url=settings.OAUTH_SIGNIN_URL,
realm_name=settings.OAUTH_SIGNIN_REALM,
user_realm_name=settings.OAUTH_SIGNIN_REALM,
client_id=settings.OAUTH_SIGNIN_ADMIN_CLIENT_ID,
client_secret_key=settings.OAUTH_SIGNIN_ADMIN_CLIENT_SECRET,
verify=False,
)
keycloak_admin = KeycloakAdmin(connection=keycloak_connection)
def add_roles_to_user(user: User, course_roles: CourseRolesType):
return _handle_add_remove_action(
user=user,
course_roles=course_roles,
func=keycloak_admin.assign_realm_roles,
kc_exception=KeycloakPostError,
myvbv_exception=MyVbvKeycloakPostError,
)
def remove_roles_from_user(user: User, course_roles: CourseRolesType):
return _handle_add_remove_action(
user=user,
course_roles=course_roles,
func=keycloak_admin.delete_realm_roles_of_user,
kc_exception=KeycloakDeleteError,
myvbv_exception=MyVbvKeycloakDeleteError,
)
def _handle_add_remove_action(
user: User,
course_roles: CourseRolesType,
func: callable,
kc_exception: KeycloakError,
myvbv_exception: KeycloakPostError or KeycloakDeleteError,
):
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if settings.OAUTH_SYNC_ROLES and user_id:
request_roles = _get_role_request_data(course_roles)
if not request_roles:
return False
try:
func(
user_id=user_id,
roles=request_roles,
)
except kc_exception as e:
raise myvbv_exception(e, request_roles)
return True
return False
def update_roles_for_user(
user: User, add_course_roles: CourseRolesType, remove_course_roles: CourseRolesType
):
if settings.OAUTH_SYNC_ROLES:
remove_ret_value = remove_roles_from_user(user, remove_course_roles)
add_ret_value = add_roles_to_user(user, add_course_roles)
return remove_ret_value and add_ret_value
return False
def sync_roles_for_user(user: User, course_roles: CourseRolesType):
if settings.OAUTH_SYNC_ROLES:
user_id = user.additional_json_data.get("intermediate_sso_id", "")
if user_id:
assigned_roles = _filter_non_myvbv_roles(
keycloak_admin.get_realm_roles_of_user(user_id=user_id)
)
if assigned_roles:
keycloak_admin.delete_realm_roles_of_user(
user_id=user_id,
roles=assigned_roles,
)
roles = _get_role_request_data(course_roles)
keycloak_admin.assign_realm_roles(user_id=user_id, roles=roles)
return True
return False
def create_user(user: User):
if settings.OAUTH_SYNC_ROLES:
user_data = {
"username": user.email,
"email": user.email,
"enabled": True,
"firstName": user.first_name,
"lastName": user.last_name,
}
try:
user_id = keycloak_admin.create_user(user_data, exist_ok=True)
except KeycloakPostError as e:
raise MyVbvKeycloakPostError(e, user_data)
return user_id
return ""
def get_roles_for_user(user_id: str):
if settings.OAUTH_SYNC_ROLES:
return keycloak_admin.get_realm_roles_of_user(
user_id=user_id,
)
return []
def _get_role_request_data(course_roles: CourseRolesType) -> List[Dict[str, str]]:
request_roles = []
for item in course_roles:
course_slug, role = item
sanitized_course_slug = _remove_accents(course_slug)
try:
oauth_role = _create_role_name(sanitized_course_slug, role)
request_roles.append({"id": ROLE_IDS[oauth_role], "name": oauth_role})
except KeyError:
logger.warning(
"Role or course not found in SSO_ROLES",
course_slug=course_slug,
role=role,
)
return request_roles
def _create_role_name(course_slug: str, role: str) -> List[str]:
return SSO_ROLES[course_slug][role]
def _remove_accents(input_str) -> str:
nfkd_form = unicodedata.normalize("NFKD", input_str)
return "".join([char for char in nfkd_form if not unicodedata.combining(char)])
def _filter_non_myvbv_roles(roles: List[str]) -> List[str]:
return [role for role in roles if role["name"].startswith("myvbv-")]