wip: Export attendance data (no circle data)
This commit is contained in:
parent
93a5cba262
commit
296644ff22
|
|
@ -0,0 +1,18 @@
|
|||
import djclick as click
|
||||
import structlog
|
||||
|
||||
from vbv_lernwelt.course_session.services.export import export_attendance
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument("course_session_id")
|
||||
@click.option(
|
||||
"--save-as-file/--no-save-as-file",
|
||||
default=True,
|
||||
help="`save-as-file` to save the file, `no-save-as-file` returns bytes. Default is `save-as-file`.",
|
||||
)
|
||||
def command(course_session_id, save_as_file):
|
||||
# using the output from call_command was a bit cumbersome, so this is just a wrapper for the actual function
|
||||
export_attendance([course_session_id, 14], save_as_file)
|
||||
|
|
@ -0,0 +1,126 @@
|
|||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
from itertools import groupby
|
||||
|
||||
import structlog
|
||||
from openpyxl import Workbook
|
||||
|
||||
from vbv_lernwelt.course.models import CourseSessionUser
|
||||
from vbv_lernwelt.course_session.models import CourseSessionAttendanceCourse
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
def export_attendance(course_session_ids: list[str], save_as_file: bool, circles=None):
|
||||
wb = Workbook()
|
||||
|
||||
# remove the first sheet is just easier than keeping track of the active sheet
|
||||
wb.remove(wb.active)
|
||||
|
||||
# get attencdance courses for course sessions and circles
|
||||
# get users for each course session
|
||||
# set headers
|
||||
# sheets group, cs or generation?
|
||||
|
||||
cs_users = CourseSessionUser.objects.filter(course_session_id__in=course_session_ids,
|
||||
role=CourseSessionUser.Role.MEMBER).order_by("course_session",
|
||||
"user__last_name",
|
||||
"user__first_name")
|
||||
attendance_courses = CourseSessionAttendanceCourse.objects.filter(
|
||||
course_session_id__in=course_session_ids).order_by("course_session", "due_date")
|
||||
|
||||
grouped_cs_users = {key: list(group) for key, group in
|
||||
groupby(sorted(cs_users, key=lambda x: x.course_session.title),
|
||||
key=lambda x: x.course_session.title)}
|
||||
grouped_attendance_course = {key: list(group) for key, group in
|
||||
groupby(sorted(attendance_courses, key=lambda x: x.course_session.title),
|
||||
key=lambda x: x.course_session.title)}
|
||||
|
||||
for course_session, cs_users in grouped_cs_users.items():
|
||||
logger.debug(
|
||||
"export_attendance_for_course_session",
|
||||
data={
|
||||
"course_session": course_session,
|
||||
},
|
||||
label="attendance_export",
|
||||
)
|
||||
_create_sheet(wb, course_session, cs_users, grouped_attendance_course[course_session])
|
||||
|
||||
if save_as_file:
|
||||
wb.save(make_export_filename())
|
||||
else:
|
||||
output = BytesIO()
|
||||
wb.save(output)
|
||||
|
||||
output.seek(0)
|
||||
return output.getvalue()
|
||||
|
||||
|
||||
def _create_sheet(wb: Workbook, title: str, users: list[CourseSessionUser],
|
||||
attendance_courses: list[CourseSessionAttendanceCourse]):
|
||||
sheet = wb.create_sheet(title=sanitize_sheet_name(title))
|
||||
|
||||
if len(users) == 0:
|
||||
return sheet
|
||||
|
||||
# headers
|
||||
# firstname, lastname, email, lehrvertragsnummer, <attendance_course> <date>, status <attendance_course>, ..
|
||||
# todo: translate headers
|
||||
sheet.cell(row=1, column=1, value="Vorname")
|
||||
sheet.cell(row=1, column=2, value="Nachname")
|
||||
sheet.cell(row=1, column=3, value="Email")
|
||||
sheet.cell(row=1, column=4, value="Lehrvertragsnummer")
|
||||
|
||||
col_idx = 5
|
||||
attendance_data = {}
|
||||
for course in attendance_courses:
|
||||
course_title = course.get_circle().title
|
||||
sheet.cell(row=1, column=col_idx,
|
||||
value=f"Anwesenheit {course_title} {course.due_date.start.strftime('%d.%m.%Y')}")
|
||||
user_dict_map = {d['user_id']: d for d in course.attendance_user_list}
|
||||
attendance_data[course_title] = user_dict_map
|
||||
|
||||
col_idx += 1
|
||||
|
||||
_add_rows(sheet, users, attendance_data)
|
||||
|
||||
return sheet
|
||||
|
||||
|
||||
def _add_rows(sheet, users: list[CourseSessionUser], attendance_data):
|
||||
for row_idx, user in enumerate(users, start=2):
|
||||
sheet.cell(row=row_idx, column=1, value=user.user.first_name)
|
||||
sheet.cell(row=row_idx, column=2, value=user.user.last_name)
|
||||
sheet.cell(row=row_idx, column=3, value=user.user.email)
|
||||
sheet.cell(row=row_idx, column=4, value=user.user.additional_json_data.get("Lehrvertragsnummer", ""))
|
||||
|
||||
col_idx = 5
|
||||
for key, user_dict_map in attendance_data.items():
|
||||
user_dict = user_dict_map.get(str(user.user.id), {})
|
||||
status = user_dict.get("status", "") if user_dict else ""
|
||||
status_text = "Anwesend" if status == "PRESENT" else "Nicht anwesend"
|
||||
sheet.cell(row=row_idx, column=col_idx, value=status_text)
|
||||
col_idx += 1
|
||||
|
||||
|
||||
def make_export_filename(name: str = "attendance_export"):
|
||||
today_date = datetime.today().strftime("%Y-%m-%d")
|
||||
return f"{name}_{today_date}.xlsx"
|
||||
|
||||
|
||||
def sanitize_sheet_name(text, default_name="DefaultSheet"):
|
||||
if text is None:
|
||||
return default_name
|
||||
|
||||
prohibited_chars = ["\\", "/", "*", "?", ":", "[", "]"]
|
||||
for char in prohibited_chars:
|
||||
text = text.replace(char, "")
|
||||
|
||||
text = text.strip("'")
|
||||
|
||||
text = text[:31]
|
||||
|
||||
if len(text) == 0:
|
||||
return default_name
|
||||
|
||||
return text
|
||||
|
|
@ -1,4 +1,3 @@
|
|||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
from itertools import groupby
|
||||
from operator import attrgetter
|
||||
|
|
@ -11,6 +10,7 @@ from openpyxl import Workbook
|
|||
from vbv_lernwelt.core.models import User
|
||||
from vbv_lernwelt.course.models import CourseCompletionStatus, CourseSession
|
||||
from vbv_lernwelt.course.services import mark_course_completion
|
||||
from vbv_lernwelt.course_session.services.export import make_export_filename, sanitize_sheet_name
|
||||
from vbv_lernwelt.feedback.models import FeedbackResponse
|
||||
from vbv_lernwelt.learnpath.models import (
|
||||
LearningContentFeedbackUK,
|
||||
|
|
@ -149,16 +149,25 @@ def initial_data_for_feedback_page(
|
|||
return {}
|
||||
|
||||
|
||||
def export_feedback(course_session_ids: list[str], save_as_file: bool):
|
||||
def export_feedback(course_session_ids: list[str], save_as_file: bool, circles=None):
|
||||
wb = Workbook()
|
||||
|
||||
# remove the first sheet is just easier than keeping track of the active sheet
|
||||
wb.remove_sheet(wb.active)
|
||||
wb.remove(wb.active)
|
||||
|
||||
feedbacks = FeedbackResponse.objects.filter(
|
||||
course_session_id__in=course_session_ids,
|
||||
submitted=True,
|
||||
).order_by("circle", "course_session", "updated_at")
|
||||
if circles:
|
||||
feedback_unordered = FeedbackResponse.objects.filter(
|
||||
course_session_id__in=course_session_ids,
|
||||
circle__in=circles,
|
||||
submitted=True,
|
||||
)
|
||||
else:
|
||||
feedback_unordered = FeedbackResponse.objects.filter(
|
||||
course_session_id__in=course_session_ids,
|
||||
submitted=True,
|
||||
)
|
||||
|
||||
feedbacks = feedback_unordered.order_by("circle", "course_session", "updated_at")
|
||||
grouped_feedbacks = groupby(feedbacks, key=attrgetter("circle"))
|
||||
|
||||
for circle, group_feedbacks in grouped_feedbacks:
|
||||
|
|
@ -175,7 +184,7 @@ def export_feedback(course_session_ids: list[str], save_as_file: bool):
|
|||
_create_sheet(wb, circle.title, group_feedbacks)
|
||||
|
||||
if save_as_file:
|
||||
wb.save(make_export_filename())
|
||||
wb.save(make_export_filename(name="feedback_export"))
|
||||
else:
|
||||
output = BytesIO()
|
||||
wb.save(output)
|
||||
|
|
@ -185,7 +194,7 @@ def export_feedback(course_session_ids: list[str], save_as_file: bool):
|
|||
|
||||
|
||||
def _create_sheet(wb: Workbook, title: str, data: list[FeedbackResponse]):
|
||||
sheet = wb.create_sheet(title=_sanitize_sheet_name(title))
|
||||
sheet = wb.create_sheet(title=sanitize_sheet_name(title))
|
||||
|
||||
if len(data) == 0:
|
||||
return sheet
|
||||
|
|
@ -208,24 +217,6 @@ def _create_sheet(wb: Workbook, title: str, data: list[FeedbackResponse]):
|
|||
return sheet
|
||||
|
||||
|
||||
def _sanitize_sheet_name(text, default_name="DefaultSheet"):
|
||||
if text is None:
|
||||
return default_name
|
||||
|
||||
prohibited_chars = ["\\", "/", "*", "?", ":", "[", "]"]
|
||||
for char in prohibited_chars:
|
||||
text = text.replace(char, "")
|
||||
|
||||
text = text.strip("'")
|
||||
|
||||
text = text[:31]
|
||||
|
||||
if len(text) == 0:
|
||||
return default_name
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def _add_rows(sheet, data, question_data):
|
||||
for row_idx, feedback in enumerate(data, start=2):
|
||||
sheet.cell(row=row_idx, column=1, value=feedback.course_session.title)
|
||||
|
|
@ -237,11 +228,6 @@ def _add_rows(sheet, data, question_data):
|
|||
sheet.cell(row=row_idx, column=col_idx, value=response)
|
||||
|
||||
|
||||
def make_export_filename(name: str = "feedback_export"):
|
||||
today_date = datetime.today().strftime("%Y-%m-%d")
|
||||
return f"{name}_{today_date}.xlsx"
|
||||
|
||||
|
||||
# used as admin action, that's why it's not in the views.py
|
||||
def get_feedbacks_for_course_sessions(_modeladmin, _request, queryset):
|
||||
file_name = "feedback_export_durchfuehrungen"
|
||||
|
|
|
|||
Loading…
Reference in New Issue