262 lines
8.2 KiB
Python
262 lines
8.2 KiB
Python
from typing import Any, Dict
|
|
|
|
import structlog
|
|
from openpyxl.reader.excel import load_workbook
|
|
|
|
from vbv_lernwelt.core.models import User
|
|
from vbv_lernwelt.course.models import Course, CourseSession, CourseSessionUser
|
|
from vbv_lernwelt.importer.utils import (
|
|
calc_header_tuple_list_from_pyxl_sheet,
|
|
parse_circle_group_string,
|
|
try_parse_datetime,
|
|
)
|
|
from vbv_lernwelt.learnpath.models import Circle, LearningContentAttendanceCourse
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
def create_or_update_user(
|
|
email: str, first_name: str = "", last_name: str = "", sso_id: str = None
|
|
):
|
|
logger.debug(
|
|
"create_or_update_user",
|
|
email=email,
|
|
first_name=first_name,
|
|
last_name=last_name,
|
|
sso_id=sso_id,
|
|
label="import",
|
|
)
|
|
user = None
|
|
if sso_id:
|
|
user_qs = User.objects.filter(sso_id=sso_id)
|
|
if user_qs.exists():
|
|
user = user_qs.first()
|
|
|
|
if not user:
|
|
user_qs = User.objects.filter(email=email)
|
|
if user_qs.exists():
|
|
user = user_qs.first()
|
|
|
|
if not user:
|
|
# create user
|
|
user = User(sso_id=sso_id, email=email, username=email)
|
|
|
|
user.email = email
|
|
user.sso_id = user.sso_id or sso_id
|
|
user.first_name = first_name or user.first_name
|
|
user.last_name = last_name or user.last_name
|
|
user.username = email
|
|
user.set_unusable_password()
|
|
user.save()
|
|
|
|
return user
|
|
|
|
|
|
def import_course_sessions_from_excel(course: Course, filename: str, language="de"):
|
|
workbook = load_workbook(filename=filename)
|
|
sheet = workbook["Schulungen Durchführung"]
|
|
|
|
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
|
|
for row in tuple_list:
|
|
create_or_update_course_session(
|
|
course, dict(row), language=language, circles=["Fahrzeug"]
|
|
)
|
|
|
|
|
|
def create_or_update_course_session(
|
|
course: Course, data: Dict[str, Any], language="de", circles=None
|
|
):
|
|
"""
|
|
:param data: the following keys are required to process the data: Generation, Region, Klasse
|
|
:return:
|
|
"""
|
|
|
|
logger.debug(
|
|
"create_or_update_course_session",
|
|
course=course.title,
|
|
data=data,
|
|
label="import",
|
|
)
|
|
|
|
if circles is None:
|
|
circles = []
|
|
|
|
# TODO: validation
|
|
group = data["Klasse"].strip()
|
|
import_id = data["ID"].strip()
|
|
|
|
generation = str(data["Generation"]).strip()
|
|
region = data["Region"].strip()
|
|
|
|
title = f"{region} {generation} {group}"
|
|
|
|
if not import_id.lower().startswith(language.lower()):
|
|
# FIXME: language check depends on import_id format for now...
|
|
return None
|
|
|
|
cs, _created = CourseSession.objects.get_or_create(
|
|
import_id=import_id, group=group, course=course
|
|
)
|
|
|
|
cs.additional_json_data["import_data"] = data
|
|
cs.save()
|
|
|
|
cs.title = title
|
|
cs.generation = generation
|
|
cs.region = region
|
|
cs.group = group
|
|
cs.import_id = import_id
|
|
|
|
cs.save()
|
|
for circle in circles:
|
|
if language == "de":
|
|
attendance_course_lp_qs = LearningContentAttendanceCourse.objects.filter(
|
|
slug=f"{course.slug}-lp-circle-{circle.lower()}-lc-präsenzkurs-{circle.lower()}"
|
|
)
|
|
add_attendance_course_date(cs, attendance_course_lp_qs, circle, data)
|
|
elif language == "fr":
|
|
# todo: this is a hack remove me
|
|
print(f"{course.slug}-lp-circle-véhicule-lc-cours-de-présence-véhicule")
|
|
attendance_course_lp_qs = LearningContentAttendanceCourse.objects.filter(
|
|
slug=f"{course.slug}-lp-circle-véhicule-lc-cours-de-présence-véhicule-à-moteur"
|
|
)
|
|
add_attendance_course_date(cs, attendance_course_lp_qs, circle, data)
|
|
elif language == "it":
|
|
print(course.slug, f"{course.slug}-lp-circle-veicolo-lc-classi-di-frequenza-veicolo")
|
|
# todo: this is a hack remove me
|
|
attendance_course_lp_qs = LearningContentAttendanceCourse.objects.filter(
|
|
slug=f"{course.slug}-lp-circle-veicolo-lc-corso-di-presenza-veicolo"
|
|
)
|
|
print(attendance_course_lp_qs)
|
|
add_attendance_course_date(cs, attendance_course_lp_qs, circle, data)
|
|
|
|
return cs
|
|
|
|
|
|
def add_attendance_course_date(course_session, attendance_course_lp_qs, circle, data):
|
|
if attendance_course_lp_qs.exists():
|
|
course_session.attendance_courses.append(
|
|
{
|
|
"learningContentId": attendance_course_lp_qs.first().id,
|
|
"start": try_parse_datetime(data[f"{circle} Start"])[1].isoformat(),
|
|
"end": try_parse_datetime(data[f"{circle} Ende"])[1].isoformat(),
|
|
"location": data[f"{circle} Raum"],
|
|
"trainer": "",
|
|
}
|
|
)
|
|
course_session.save()
|
|
|
|
|
|
def import_trainers_from_excel(course: Course, filename: str, language="de"):
|
|
workbook = load_workbook(filename=filename)
|
|
sheet = workbook["Schulungen Trainer"]
|
|
|
|
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
|
|
for row in tuple_list:
|
|
create_or_update_trainer(course, dict(row), language=language)
|
|
|
|
|
|
def create_or_update_trainer(course: Course, data: Dict[str, Any], language="de"):
|
|
logger.debug(
|
|
"create_or_update_trainer",
|
|
data=data,
|
|
label="import",
|
|
)
|
|
|
|
user = create_or_update_user(
|
|
email=data["Email"].lower(),
|
|
first_name=data["Vorname"],
|
|
last_name=data["Name"],
|
|
)
|
|
|
|
groups = [g.strip() for g in data["Klasse"].strip().split(",")]
|
|
|
|
# general expert handling
|
|
for group in groups:
|
|
import_id = f"{data['Generation'].strip()} {group}"
|
|
course_session = CourseSession.objects.filter(
|
|
import_id=import_id,
|
|
group=group,
|
|
).first()
|
|
if course_session:
|
|
csu, _created = CourseSessionUser.objects.get_or_create(
|
|
course_session_id=course_session.id, user_id=user.id
|
|
)
|
|
csu.role = CourseSessionUser.Role.EXPERT
|
|
csu.save()
|
|
else:
|
|
logger.warning(
|
|
"create_or_update_trainer: course_session not found",
|
|
import_id=import_id,
|
|
group=group,
|
|
label="import",
|
|
)
|
|
|
|
if not course:
|
|
return
|
|
|
|
# circle expert handling
|
|
circle_data = parse_circle_group_string(data["Circles"])
|
|
for circle_string in circle_data:
|
|
parts = circle_string.split("(", 1)
|
|
circle_name = parts[0].strip()
|
|
|
|
groups = [g.strip() for g in parts[1].rstrip(")").strip().split(",")]
|
|
|
|
# FIXME: hardcoded translation
|
|
if language == "fr" and circle_name == "Fahrzeug":
|
|
circle_name = "Véhicule"
|
|
|
|
# print(circle_name, groups)
|
|
for group in groups:
|
|
import_id = f"{data['Generation'].strip()} {group}"
|
|
course_session = CourseSession.objects.filter(
|
|
import_id=import_id, group=group
|
|
).first()
|
|
circle = Circle.objects.filter(
|
|
slug=f"{course.slug}-lp-circle-{circle_name.lower()}"
|
|
).first()
|
|
|
|
if course_session and circle:
|
|
csu = CourseSessionUser.objects.filter(
|
|
course_session_id=course_session.id, user_id=user.id
|
|
).first()
|
|
if csu:
|
|
csu.expert.add(circle)
|
|
csu.save()
|
|
|
|
|
|
def import_students_from_excel(filename: str):
|
|
workbook = load_workbook(filename=filename)
|
|
sheet = workbook.active
|
|
|
|
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
|
|
for row in tuple_list:
|
|
create_or_update_student(dict(row))
|
|
|
|
|
|
def create_or_update_student(data: Dict[str, Any]):
|
|
logger.debug(
|
|
"create_or_update_student",
|
|
data=data,
|
|
label="import",
|
|
)
|
|
|
|
user = create_or_update_user(
|
|
email=data["Email"].lower(),
|
|
first_name=data["Vorname"],
|
|
last_name=data["Name"],
|
|
)
|
|
|
|
# TODO: handle language
|
|
|
|
# general expert handling
|
|
import_ids = [i.strip() for i in data["Durchführungen"].split(",")]
|
|
for import_id in import_ids:
|
|
course_session = CourseSession.objects.filter(import_id=import_id).first()
|
|
if course_session:
|
|
csu, _created = CourseSessionUser.objects.get_or_create(
|
|
course_session_id=course_session.id, user_id=user.id
|
|
)
|
|
csu.save()
|