vbv/server/vbv_lernwelt/course_session/services/export_attendance.py

203 lines
5.9 KiB
Python

import typing
from datetime import datetime
from io import BytesIO
from itertools import groupby
import structlog
from django.utils.translation import gettext_lazy as _
from openpyxl import Workbook
from vbv_lernwelt.course.models import CourseSessionUser
from vbv_lernwelt.course_session.models import CourseSessionAttendanceCourse
logger = structlog.get_logger(__name__)
ATTENDANCE_EXPORT_FILENAME = _("export_anwesenheit")
def export_attendance(
course_session_ids: list[str],
save_as_file: bool = False,
circle_ids: list[int] = None,
):
wb = Workbook()
# remove the first sheet is just easier than keeping track of the active sheet
wb.remove(wb.active)
attendance_courses = CourseSessionAttendanceCourse.objects.filter(
course_session_id__in=course_session_ids
).order_by("course_session", "due_date")
grouped_cs_users = get_ordered_csus_by_course_session(course_session_ids)
# create dict with course_session_title as key and list of attendance_courses as value. Easier to access in the loop
grouped_attendance_course = {
key: list(group)
for key, group in groupby(
sorted(attendance_courses, key=lambda x: x.course_session.title),
key=lambda x: x.course_session.title,
)
}
# create a sheet for each course_session
for course_session, cs_users in grouped_cs_users.items():
logger.debug(
"export_attendance_for_course_session",
data={
"course_session": course_session,
},
label="attendance_export",
)
_create_sheet(
wb,
course_session,
cs_users,
grouped_attendance_course[course_session],
circle_ids,
)
if save_as_file:
wb.save(make_export_filename(ATTENDANCE_EXPORT_FILENAME))
else:
output = BytesIO()
wb.save(output)
output.seek(0)
return output.getvalue()
def _create_sheet(
wb: Workbook,
title: str,
users: list[CourseSessionUser],
attendance_courses: list[CourseSessionAttendanceCourse],
circle_ids: typing.Optional[list[int]],
):
sheet = wb.create_sheet(title=sanitize_sheet_name(title))
if len(users) == 0:
return sheet
# headers
# common user headers..., <attendance_course> <date>, status <attendance_course>, ..
col_idx = add_user_headers(sheet)
sheet.cell(row=1, column=col_idx, value=str(_("Optionale Anwesenheit")))
col_idx += 1
attendance_data = {}
for course in attendance_courses:
circle = course.get_circle()
if circle_ids and circle.id not in circle_ids:
continue
presence_str = str(_("Anwesenheit")) # f-strings are not picked up by gettext
sheet.cell(
row=1,
column=col_idx,
value=f"{presence_str} {circle.title} {course.due_date.start.strftime('%d.%m.%Y')}",
)
user_dict_map = {d["user_id"]: d for d in course.attendance_user_list}
attendance_data[circle.title] = user_dict_map
col_idx += 1
# add rows with user data
_add_rows(sheet, users, attendance_data)
return sheet
def _add_rows(sheet, users: list[CourseSessionUser], attendance_data):
for row_idx, user in enumerate(users, start=2):
col_idx = add_user_export_data(sheet, user, row_idx)
optional_attendance_text = (
str(_("Ja")) if user.optional_attendance else str(_("Nein"))
)
sheet.cell(row=row_idx, column=col_idx, value=optional_attendance_text)
col_idx += 1
for key, user_dict_map in attendance_data.items():
user_dict = user_dict_map.get(str(user.user.id), {})
status = user_dict.get("status", "") if user_dict else ""
status_text = (
str(_("Anwesend")) if status == "PRESENT" else str(_("Nicht anwesend"))
)
sheet.cell(row=row_idx, column=col_idx, value=status_text)
col_idx += 1
def add_user_headers(sheet):
sheet.cell(row=1, column=1, value=str(_("Vorname")))
sheet.cell(row=1, column=2, value=str(_("Nachname")))
sheet.cell(row=1, column=3, value=str(_("Email")))
sheet.cell(row=1, column=4, value=str(_("Lehrvertragsnummer")))
return 5 # return the next column index
def add_user_export_data(sheet, user: CourseSessionUser, row_idx: int) -> int:
sheet.cell(row=row_idx, column=1, value=user.user.first_name)
sheet.cell(row=row_idx, column=2, value=user.user.last_name)
sheet.cell(row=row_idx, column=3, value=user.user.email)
sheet.cell(
row=row_idx,
column=4,
value=user.user.additional_json_data.get("Lehrvertragsnummer", ""),
)
return 5 # return the next column index
def get_ordered_csus_by_course_session(
course_session_ids: list[str], user_ids: list[str] = None
):
csus = CourseSessionUser.objects.filter(
course_session_id__in=course_session_ids, role=CourseSessionUser.Role.MEMBER
)
if user_ids:
csus = csus.filter(user_id__in=user_ids)
csus = csus.order_by("course_session", "user__last_name", "user__first_name")
return group_by_session_title(
sorted(csus, key=lambda x: x.course_session.title),
)
def group_by_session_title(items):
return {
key: list(group)
for key, group in groupby(
sorted(items, key=lambda x: x.course_session.title),
key=lambda x: x.course_session.title,
)
}
def make_export_filename(name: str):
today_date = datetime.today().strftime("%Y-%m-%d")
return f"{name}_{today_date}.xlsx"
def sanitize_sheet_name(text, default_name="DefaultSheet"):
if text is None:
return default_name
prohibited_chars = ["\\", "/", "*", "?", ":", "[", "]"]
for char in prohibited_chars:
text = text.replace(char, "")
text = text.strip("'")
text = text[:31]
if len(text) == 0:
return default_name
return text