386 lines
11 KiB
Python
386 lines
11 KiB
Python
from datetime import date, datetime, time
|
|
from typing import Any, Dict, List
|
|
|
|
import structlog
|
|
from openpyxl.reader.excel import load_workbook
|
|
|
|
from vbv_lernwelt.core.models import User
|
|
from vbv_lernwelt.course.consts import COURSE_UK, COURSE_UK_FR, COURSE_UK_IT
|
|
from vbv_lernwelt.course.models import Course, CourseSession, CourseSessionUser
|
|
from vbv_lernwelt.course_session.models import CourseSessionAttendanceCourse
|
|
from vbv_lernwelt.importer.utils import (
|
|
calc_header_tuple_list_from_pyxl_sheet,
|
|
parse_circle_group_string,
|
|
try_parse_datetime,
|
|
)
|
|
from vbv_lernwelt.learnpath.models import Circle, LearningContentAttendanceCourse
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
CIRCLE_NAMES = {
|
|
"Kickoff": {"de": "Kickoff", "fr": "Lancement", "it": "Introduzione"},
|
|
"Basis": {"de": "Basis", "fr": "Base", "it": "Base"},
|
|
"Fahrzeug": {"de": "Fahrzeug", "fr": "Véhicule", "it": "Veicolo"},
|
|
"Haushalt Teil 1": {"de": "Haushalt Teil 1", "fr": "Habitat partie 1", "it": ""},
|
|
"Haushalt Teil 2": {"de": "Haushalt Teil 2", "fr": "Haushalt Teil 2", "it": ""},
|
|
}
|
|
|
|
T2L_IGNORE_FIELDS = ["Vorname", "Name", "Email", "Sprache", "Durchführungen"]
|
|
|
|
|
|
class DataImportError(Exception):
|
|
pass
|
|
|
|
|
|
def create_or_update_user(
|
|
email: str,
|
|
first_name: str = "",
|
|
last_name: str = "",
|
|
sso_id: str = None,
|
|
contract_number: str = "",
|
|
):
|
|
logger.debug(
|
|
"create_or_update_user",
|
|
email=email,
|
|
first_name=first_name,
|
|
last_name=last_name,
|
|
sso_id=sso_id,
|
|
label="import",
|
|
)
|
|
user = None
|
|
if sso_id:
|
|
user_qs = User.objects.filter(sso_id=sso_id)
|
|
if user_qs.exists():
|
|
user = user_qs.first()
|
|
|
|
if not user and contract_number:
|
|
user_qs = User.objects.filter(
|
|
additional_json_data__Lehrvertragsnummer=contract_number
|
|
)
|
|
if user_qs.exists():
|
|
user = user_qs.first()
|
|
|
|
if not user:
|
|
user_qs = User.objects.filter(email=email)
|
|
if user_qs.exists():
|
|
user = user_qs.first()
|
|
|
|
if not user:
|
|
# create user
|
|
user = User(sso_id=sso_id, email=email, username=email)
|
|
|
|
user.email = email
|
|
user.sso_id = user.sso_id or sso_id
|
|
user.first_name = first_name or user.first_name
|
|
user.last_name = last_name or user.last_name
|
|
user.username = email
|
|
user.set_unusable_password()
|
|
user.save()
|
|
|
|
return user
|
|
|
|
|
|
def import_course_sessions_from_excel(
|
|
filename: str, course: Course = None, restrict_language=None
|
|
):
|
|
workbook = load_workbook(filename=filename)
|
|
sheet = workbook["Schulungen Durchführung"]
|
|
|
|
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
|
|
|
|
for row in tuple_list:
|
|
data = dict(row)
|
|
validate_row_data(data, ["Klasse", "ID", "Generation", "Region", "Sprache"])
|
|
language = data["Sprache"].strip()
|
|
|
|
# this can be removed when the training import (create_course_training_xx) is no longer used
|
|
if restrict_language and language != restrict_language:
|
|
continue
|
|
|
|
if not course:
|
|
course = get_uk_course(language)
|
|
|
|
create_or_update_course_session(
|
|
course, data, language, circle_keys=["Kickoff", "Basis", "Fahrzeug"]
|
|
)
|
|
|
|
|
|
def create_or_update_course_session(
|
|
course: Course,
|
|
data: Dict[str, Any],
|
|
language: str,
|
|
circle_keys=None,
|
|
):
|
|
"""
|
|
:param data: the following keys are required to process the data: Generation, Region, Klasse
|
|
:return:
|
|
"""
|
|
logger.debug(
|
|
"create_or_update_course_session",
|
|
course=course.title,
|
|
data=data,
|
|
label="import",
|
|
)
|
|
|
|
if not circle_keys:
|
|
circle_keys = []
|
|
|
|
group = data["Klasse"].strip()
|
|
import_id = data["ID"].strip()
|
|
|
|
generation = str(data["Generation"]).strip()
|
|
region = data["Region"].strip()
|
|
|
|
title = f"{region} {generation} {group}"
|
|
|
|
cs, _created = CourseSession.objects.get_or_create(
|
|
title=title, course=course, import_id=import_id
|
|
)
|
|
|
|
cs.additional_json_data["import_data"] = data
|
|
cs.save()
|
|
|
|
cs.title = title
|
|
cs.generation = generation
|
|
cs.region = region
|
|
cs.group = group
|
|
cs.import_id = import_id
|
|
|
|
cs.save()
|
|
for circle in circle_keys:
|
|
circle_name = CIRCLE_NAMES[circle][language]
|
|
|
|
attendance_course_lc = LearningContentAttendanceCourse.objects.filter(
|
|
slug=f"{course.slug}-lp-circle-{circle_name.lower()}-lc-präsenzkurs-{circle_name.lower()}"
|
|
).first()
|
|
|
|
if attendance_course_lc:
|
|
# update existing data
|
|
csa, _created = CourseSessionAttendanceCourse.objects.get_or_create(
|
|
course_session=cs, learning_content=attendance_course_lc
|
|
)
|
|
|
|
location = f"{data[f'{circle} Raum']}, {data[f'{circle} Standort']}, {data[f'{circle} Adresse']}"
|
|
csa.location = location
|
|
|
|
csa.trainer = ""
|
|
csa.due_date.start = try_parse_datetime(data[f"{circle} Start"])[1]
|
|
csa.due_date.end = try_parse_datetime(data[f"{circle} Ende"])[1]
|
|
csa.due_date.save()
|
|
|
|
return cs
|
|
|
|
|
|
def validate_row_data(data: Dict[str, any], required_headers: List[str]):
|
|
for header in required_headers:
|
|
some = str(data.get(header, "")).strip()
|
|
if str(data.get(header, "")).strip() in ["", "None"]:
|
|
logger.debug(
|
|
"validate_row_data_missing_header",
|
|
data={"data": data, "header": header},
|
|
label="import",
|
|
)
|
|
raise DataImportError(f"Missing or empty value for header {header}")
|
|
|
|
|
|
def get_uk_course(language: str) -> Course:
|
|
if language == "fr":
|
|
course_id = COURSE_UK_FR
|
|
elif language == "it":
|
|
course_id = COURSE_UK_IT
|
|
else:
|
|
course_id = COURSE_UK
|
|
|
|
return Course.objects.get(id=course_id)
|
|
|
|
|
|
def import_trainers_from_excel_for_training(
|
|
filename: str, language="de", course: Course = None
|
|
):
|
|
workbook = load_workbook(filename=filename)
|
|
sheet = workbook["Schulungen Trainer"]
|
|
|
|
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
|
|
for row in tuple_list:
|
|
data = dict(row)
|
|
validate_row_data(
|
|
data, ["Email", "Vorname", "Name", "Sprache", "Klasse", "Generation"]
|
|
)
|
|
create_or_update_trainer(course, data, language=language)
|
|
|
|
|
|
def create_or_update_trainer(course: Course, data: Dict[str, Any], language="de"):
|
|
course_title = course.title if course else "None"
|
|
|
|
logger.debug(
|
|
"create_or_update_trainer",
|
|
course=course_title,
|
|
data=data,
|
|
label="import",
|
|
)
|
|
|
|
user = create_or_update_user(
|
|
email=data["Email"].lower(),
|
|
first_name=data["Vorname"],
|
|
last_name=data["Name"],
|
|
)
|
|
user.language = data["Sprache"]
|
|
user.save()
|
|
|
|
group = data["Klasse"].strip()
|
|
|
|
# general expert handling
|
|
import_id = f"{data['Generation'].strip()} {group}"
|
|
course_session = CourseSession.objects.filter(
|
|
import_id=import_id,
|
|
group=group,
|
|
).first()
|
|
if course_session:
|
|
csu, _created = CourseSessionUser.objects.get_or_create(
|
|
course_session_id=course_session.id, user_id=user.id
|
|
)
|
|
csu.role = CourseSessionUser.Role.EXPERT
|
|
csu.save()
|
|
else:
|
|
logger.warning(
|
|
"create_or_update_trainer: course_session not found",
|
|
import_id=import_id,
|
|
group=group,
|
|
label="import",
|
|
)
|
|
|
|
if not course and not course_session:
|
|
logger.warning(
|
|
"create_or_update_trainer: course_session and course are None",
|
|
import_id=import_id,
|
|
group=group,
|
|
label="import",
|
|
)
|
|
return
|
|
|
|
if not course:
|
|
course = course_session.course
|
|
|
|
# circle expert handling
|
|
circle_data = parse_circle_group_string(data["Circles"])
|
|
for circle_key in circle_data:
|
|
circle_name = CIRCLE_NAMES[circle_key][language]
|
|
|
|
# print(circle_name, groups)
|
|
import_id = f"{data['Generation'].strip()} {group}"
|
|
course_session = CourseSession.objects.filter(
|
|
import_id=import_id, group=group
|
|
).first()
|
|
circle = Circle.objects.filter(
|
|
slug=f"{course.slug}-lp-circle-{circle_name.lower()}"
|
|
).first()
|
|
|
|
if course_session and circle:
|
|
csu = CourseSessionUser.objects.filter(
|
|
course_session_id=course_session.id, user_id=user.id
|
|
).first()
|
|
if csu:
|
|
csu.expert.add(circle)
|
|
csu.save()
|
|
|
|
|
|
def import_students_from_excel(filename: str):
|
|
workbook = load_workbook(filename=filename)
|
|
sheet = workbook.active
|
|
|
|
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
|
|
for row in tuple_list:
|
|
data = dict(row)
|
|
validate_row_data(
|
|
data,
|
|
[
|
|
"Email",
|
|
"Vorname",
|
|
"Name",
|
|
"Sprache",
|
|
"Durchführungen",
|
|
],
|
|
)
|
|
create_or_update_student(data)
|
|
|
|
|
|
def create_or_update_student(data: Dict[str, Any]):
|
|
logger.debug(
|
|
"create_or_update_student",
|
|
data=data,
|
|
label="import",
|
|
)
|
|
|
|
user = create_or_update_user(
|
|
email=data["Email"].lower(),
|
|
first_name=data["Vorname"],
|
|
last_name=data["Name"],
|
|
contract_number=data.get("Lehrvertragsnummer", ""),
|
|
)
|
|
|
|
user.language = data["Sprache"]
|
|
update_user_json_data(user, data)
|
|
user.save()
|
|
|
|
# general expert handling
|
|
import_id = data["Durchführungen"]
|
|
course_session = CourseSession.objects.filter(import_id=import_id).first()
|
|
if course_session:
|
|
csu, _created = CourseSessionUser.objects.get_or_create(
|
|
course_session_id=course_session.id, user_id=user.id
|
|
)
|
|
csu.save()
|
|
|
|
|
|
def sync_students_from_t2l_excel(filename: str):
|
|
workbook = load_workbook(filename=filename)
|
|
sheet = workbook.active
|
|
|
|
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
|
|
for row in tuple_list:
|
|
data = dict(row)
|
|
sync_students_from_t2l(data | {})
|
|
|
|
|
|
def sync_students_from_t2l(data):
|
|
# ignore errors
|
|
try:
|
|
user = User.objects.get(
|
|
additional_json_data__Lehrvertragsnummer=data["Lehrvertragsnummer"]
|
|
)
|
|
except User.DoesNotExist:
|
|
return
|
|
|
|
# only sync data that is not in our user model
|
|
for field in T2L_IGNORE_FIELDS:
|
|
try:
|
|
del data[field]
|
|
except KeyError:
|
|
pass
|
|
|
|
update_user_json_data(user, data)
|
|
user.save()
|
|
|
|
|
|
def update_user_json_data(user: User, data: Dict[str, Any]):
|
|
user.additional_json_data = user.additional_json_data | sanitize_json_data_input(
|
|
data
|
|
)
|
|
|
|
|
|
def sanitize_json_data_input(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Saving additional_json_data fails if the data contains datetime objects.
|
|
This is a quick and dirty fix to convert datetime objects to iso strings.
|
|
"""
|
|
for key, value in data.items():
|
|
if isinstance(value, datetime):
|
|
data[key] = value.isoformat()
|
|
elif isinstance(value, date):
|
|
data[key] = value.isoformat()
|
|
elif isinstance(value, time):
|
|
data[key] = value.isoformat()
|
|
else:
|
|
data[key] = value
|
|
return data
|