wip: Add competence certificate export

This commit is contained in:
Christian Cueni 2024-05-23 09:51:15 +02:00
parent 984513b3a2
commit b16016b34c
11 changed files with 334 additions and 70 deletions

View File

@ -0,0 +1,18 @@
import djclick as click
import structlog
from vbv_lernwelt.assignment.services import export_competence_certificates
logger = structlog.get_logger(__name__)
@click.command()
@click.argument("course_session_id")
@click.option(
"--save-as-file/--no-save-as-file",
default=True,
help="`save-as-file` to save the file, `no-save-as-file` returns bytes. Default is `save-as-file`.",
)
def command(course_session_id, save_as_file):
# using the output from call_command was a bit cumbersome, so this is just a wrapper for the actual function
export_competence_certificates([course_session_id], save_as_file)

View File

@ -1,6 +1,10 @@
from copy import deepcopy
from dataclasses import dataclass
from io import BytesIO
import structlog
from django.utils import timezone
from openpyxl import Workbook
from rest_framework import serializers
from wagtail.models import Page
@ -14,10 +18,38 @@ from vbv_lernwelt.assignment.models import (
)
from vbv_lernwelt.core.models import User
from vbv_lernwelt.core.utils import find_first
from vbv_lernwelt.course.models import CourseCompletionStatus, CourseSession
from vbv_lernwelt.course.models import (
CourseCompletionStatus,
CourseSession,
CourseSessionUser,
)
from vbv_lernwelt.course.services import mark_course_completion
from vbv_lernwelt.course_session.models import (
CourseSessionAssignment,
CourseSessionEdoniqTest,
)
from vbv_lernwelt.course_session.services.export import (
add_user_export_data,
add_user_headers,
get_ordered_csus_by_course_session,
group_by_session_title,
make_export_filename,
sanitize_sheet_name,
)
from vbv_lernwelt.duedate.models import DueDate
from vbv_lernwelt.learnpath.models import LearningContent
from vbv_lernwelt.notify.services import NotificationService
logger = structlog.get_logger(__name__)
@dataclass
class CompetenceCertificateElement:
assignment: Assignment
date: DueDate
learning_content: LearningContent
course_session: CourseSession
def update_assignment_completion(
assignment_user: User,
@ -67,9 +99,9 @@ def update_assignment_completion(
assignment_user_id=assignment_user.id,
assignment_id=assignment.id,
course_session_id=course_session.id,
learning_content_page_id=learning_content_page.id
if learning_content_page
else None,
learning_content_page_id=(
learning_content_page.id if learning_content_page else None
),
)
if initialize_completion:
@ -271,3 +303,192 @@ def _remove_unknown_entries(assignment, completion_data):
key: value for key, value in completion_data.items() if key in input_task_ids
}
return filtered_completion_data
def export_competence_certificates(
course_session_ids: list[str], save_as_file: bool, circle_ids: list[int] = None
):
if len(course_session_ids) == 0:
return
COMPETENCE_ASSIGNMENT_TYPES = [
AssignmentType.CASEWORK.value,
AssignmentType.EDONIQ_TEST.value,
]
wb = Workbook()
# remove the first sheet is just easier than keeping track of the active sheet
wb.remove(wb.active)
competence_certificate_elements = _get_competence_certificate_elements(
course_session_ids
)
assignemnt_completions = AssignmentCompletion.objects.filter(
course_session_id__in=course_session_ids,
assignment__assignment_type__in=COMPETENCE_ASSIGNMENT_TYPES,
).order_by("course_session", "assignment")
# group all by the sessions title {session_id1: [...], session_id2: [...], ...}
grouped_cs_users = get_ordered_csus_by_course_session(course_session_ids)
grouped_cce = group_by_session_title(competence_certificate_elements)
grouped_ac = group_by_session_title(assignemnt_completions)
# create a sheet for each course session
for course_session, cs_users in grouped_cs_users.items():
logger.debug(
"export_assignment_completion",
data={
"course_session": course_session,
},
label="assignment_export",
)
_create_sheet(
wb,
course_session,
cs_users,
grouped_cce[course_session],
grouped_ac[course_session],
circle_ids,
)
if save_as_file:
wb.save(make_export_filename(name="competence_certificate_export"))
else:
output = BytesIO()
wb.save(output)
output.seek(0)
return output.getvalue()
def _create_sheet(
wb: Workbook,
title: str,
users: list[CourseSessionUser],
competence_certificate_element: list[CompetenceCertificateElement],
assignment_completions: list[AssignmentCompletion],
circle_ids: list[int],
):
sheet = wb.create_sheet(title=sanitize_sheet_name(title))
if len(users) == 0:
return sheet
# headers
# common user headers, Circle <title> <learningcontenttitle> bestanden, Circle <title> <learningcontenttitle> Resultat, ...
col_idx = add_user_headers(sheet)
ordered_assignement_ids = (
[]
) # keep track of the order of the columns when adding the rows
for cse in competence_certificate_element:
circle = cse.learning_content.get_circle()
if circle_ids and circle.id not in circle_ids:
continue
col_prefix = f'Circle "{circle.title}" {cse.learning_content.title} '
sheet.cell(
row=1,
column=col_idx,
value=f"{col_prefix} bestanden",
)
sheet.cell(
row=1,
column=col_idx + 1,
value=f"{col_prefix} Resultat %",
)
ordered_assignement_ids.append(cse.assignment.id)
col_idx += 2
# add rows with user results
_add_rows(sheet, users, ordered_assignement_ids, assignment_completions)
return sheet
def _add_rows(
sheet,
users: list[CourseSessionUser],
ordered_assignement_ids,
assignment_completions,
):
for row_idx, user in enumerate(users, start=2):
col_idx = add_user_export_data(sheet, user, row_idx)
for assignment_id in ordered_assignement_ids:
# get the completion for the user and the assignment
user_acs = [
ac
for ac in assignment_completions
if ac.assignment_id == assignment_id and ac.assignment_user == user.user
]
user_ac = user_acs[0] if user_acs else None
if user_ac:
status_text = (
"Bestanden" if user_ac.evaluation_passed else "Nicht bestanden"
)
sheet.cell(row=row_idx, column=col_idx, value=status_text)
try:
sheet.cell(
row=row_idx,
column=col_idx + 1,
value=round(
100
* user_ac.evaluation_points
/ user_ac.evaluation_max_points
),
)
except (ZeroDivisionError, TypeError):
sheet.cell(row=row_idx, column=col_idx + 1, value="Keine Daten")
else:
sheet.cell(row=row_idx, column=col_idx, value="Keine Daten")
sheet.cell(row=row_idx, column=col_idx + 1, value="Keine Daten")
col_idx += 2
def _get_competence_certificate_elements(
course_session_ids: list[str],
) -> list[CompetenceCertificateElement]:
course_session_assignments = CourseSessionAssignment.objects.filter(
course_session__id__in=course_session_ids,
learning_content__content_assignment__competence_certificate__isnull=False,
).order_by("course_session", "submission_deadline__start")
course_session_edoniqtests = CourseSessionEdoniqTest.objects.filter(
course_session__id__in=course_session_ids,
learning_content__content_assignment__competence_certificate__isnull=False,
).order_by("course_session", "deadline__start")
cse = [
CompetenceCertificateElement(
assignment=csa.learning_content.content_assignment,
date=csa.submission_deadline,
learning_content=csa.learning_content,
course_session=csa.course_session,
)
for csa in course_session_assignments
]
cse += [
CompetenceCertificateElement(
assignment=cset.learning_content.content_assignment,
date=cset.deadline,
learning_content=cset.learning_content,
course_session=cset.course_session,
)
for cset in course_session_edoniqtests
]
# order by course_session and submission_deadline
cse.sort(key=lambda x: (x.course_session.title, x.date.start))
return cse

View File

@ -20,20 +20,13 @@ def export_attendance(
# remove the first sheet is just easier than keeping track of the active sheet
wb.remove(wb.active)
cs_users = CourseSessionUser.objects.filter(
course_session_id__in=course_session_ids, role=CourseSessionUser.Role.MEMBER
).order_by("course_session", "user__last_name", "user__first_name")
attendance_courses = CourseSessionAttendanceCourse.objects.filter(
course_session_id__in=course_session_ids
).order_by("course_session", "due_date")
grouped_cs_users = {
key: list(group)
for key, group in groupby(
sorted(cs_users, key=lambda x: x.course_session.title),
key=lambda x: x.course_session.title,
)
}
grouped_cs_users = get_ordered_csus_by_course_session(course_session_ids)
# create dict with course_session as key and list of attendance_courses as value. Easier to access in the loop
grouped_attendance_course = {
key: list(group)
for key, group in groupby(
@ -42,6 +35,7 @@ def export_attendance(
)
}
# create a sheet for each course_session
for course_session, cs_users in grouped_cs_users.items():
logger.debug(
"export_attendance_for_course_session",
@ -81,14 +75,8 @@ def _create_sheet(
return sheet
# headers
# firstname, lastname, email, lehrvertragsnummer, <attendance_course> <date>, status <attendance_course>, ..
# todo: translate headers
sheet.cell(row=1, column=1, value="Vorname")
sheet.cell(row=1, column=2, value="Nachname")
sheet.cell(row=1, column=3, value="Email")
sheet.cell(row=1, column=4, value="Lehrvertragsnummer")
col_idx = 5
# common user headers..., <attendance_course> <date>, status <attendance_course>, ..
col_idx = add_user_headers(sheet)
attendance_data = {}
for course in attendance_courses:
circle = course.get_circle()
@ -105,6 +93,7 @@ def _create_sheet(
col_idx += 1
# add rows with user data
_add_rows(sheet, users, attendance_data)
return sheet
@ -112,16 +101,7 @@ def _create_sheet(
def _add_rows(sheet, users: list[CourseSessionUser], attendance_data):
for row_idx, user in enumerate(users, start=2):
sheet.cell(row=row_idx, column=1, value=user.user.first_name)
sheet.cell(row=row_idx, column=2, value=user.user.last_name)
sheet.cell(row=row_idx, column=3, value=user.user.email)
sheet.cell(
row=row_idx,
column=4,
value=user.user.additional_json_data.get("Lehrvertragsnummer", ""),
)
col_idx = 5
col_idx = add_user_export_data(sheet, user, row_idx)
for key, user_dict_map in attendance_data.items():
user_dict = user_dict_map.get(str(user.user.id), {})
status = user_dict.get("status", "") if user_dict else ""
@ -130,6 +110,48 @@ def _add_rows(sheet, users: list[CourseSessionUser], attendance_data):
col_idx += 1
def add_user_headers(sheet):
# todo: translate headers
sheet.cell(row=1, column=1, value="Vorname")
sheet.cell(row=1, column=2, value="Nachname")
sheet.cell(row=1, column=3, value="Email")
sheet.cell(row=1, column=4, value="Lehrvertragsnummer")
return 5 # return the next column index
def add_user_export_data(sheet, user: CourseSessionUser, row_idx: int) -> int:
sheet.cell(row=row_idx, column=1, value=user.user.first_name)
sheet.cell(row=row_idx, column=2, value=user.user.last_name)
sheet.cell(row=row_idx, column=3, value=user.user.email)
sheet.cell(
row=row_idx,
column=4,
value=user.user.additional_json_data.get("Lehrvertragsnummer", ""),
)
return 5 # return the next column index
def get_ordered_csus_by_course_session(course_session_ids: list[str]):
csus = CourseSessionUser.objects.filter(
course_session_id__in=course_session_ids, role=CourseSessionUser.Role.MEMBER
).order_by("course_session", "user__last_name", "user__first_name")
return group_by_session_title(
sorted(csus, key=lambda x: x.course_session.title),
)
def group_by_session_title(items):
return {
key: list(group)
for key, group in groupby(
sorted(items, key=lambda x: x.course_session.title),
key=lambda x: x.course_session.title,
)
}
def make_export_filename(name: str = "attendance_export"):
today_date = datetime.today().strftime("%Y-%m-%d")
return f"{name}_{today_date}.xlsx"

View File

@ -4,5 +4,4 @@ from vbv_lernwelt.course_session_group.models import CourseSessionGroup
@admin.register(CourseSessionGroup)
class CourseSessionAssignmentAdmin(admin.ModelAdmin):
...
class CourseSessionAssignmentAdmin(admin.ModelAdmin): ...

View File

@ -433,9 +433,9 @@ def get_course_config(
is_mentor=is_mentor,
widgets=get_widgets_for_course(role_key, is_uk, is_vv, is_mentor),
has_preview=has_preview(role_key),
session_to_continue_id=str(session_to_continue.id)
if session_to_continue
else None,
session_to_continue_id=(
str(session_to_continue.id) if session_to_continue else None
),
)
)

View File

@ -153,9 +153,9 @@ def fetch_course_session_all_users(courses: List[int], excluded_domains=None):
def generate_export_response(cs_users: List[User]) -> HttpResponse:
response = HttpResponse(content_type="text/csv; charset=utf-8")
response[
"Content-Disposition"
] = f"attachment; filename=edoniq_user_export_{date.today().strftime('%Y%m%d')}.csv"
response["Content-Disposition"] = (
f"attachment; filename=edoniq_user_export_{date.today().strftime('%Y%m%d')}.csv"
)
response.write("\ufeff".encode("utf8")) # UTF-8 BOM

View File

@ -86,9 +86,11 @@ def update_feedback_response(
initial_data = initial_data_for_feedback_page(learning_content_feedback_page)
merged_data = initial_data | {
key: updated_data[key]
if updated_data.get(key, "") != ""
else original_data.get(key)
key: (
updated_data[key]
if updated_data.get(key, "") != ""
else original_data.get(key)
)
for key in initial_data.keys()
}
@ -249,9 +251,9 @@ def _handle_feedback_export_action(course_seesions, file_name):
response = HttpResponse(
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
response[
"Content-Disposition"
] = f"attachment; filename={make_export_filename(file_name)}"
response["Content-Disposition"] = (
f"attachment; filename={make_export_filename(file_name)}"
)
response.write(excel_bytes)
return response

View File

@ -70,9 +70,11 @@ def get_self_feedback_evaluation(
MentorAssignmentCompletion(
# feedback_submitted as seen from the perspective of the evaluation user (feedback provider)
# means that the feedback has been evaluated by the feedback provider, hence the status is EVALUATED
status=MentorCompletionStatus.EVALUATED
if f.feedback_submitted
else MentorCompletionStatus.SUBMITTED,
status=(
MentorCompletionStatus.EVALUATED
if f.feedback_submitted
else MentorCompletionStatus.SUBMITTED
),
user_id=f.feedback_requester_user.id,
last_name=f.feedback_requester_user.last_name,
url=f"/course/{course.slug}/cockpit/mentor/self-evaluation-feedback/{f.learning_unit.id}",

View File

@ -65,9 +65,9 @@ class TestNotificationService(TestCase):
self.assertFalse(notification.emailed)
def test_send_notification_with_email(self):
self.recipient.additional_json_data[
"email_notification_categories"
] = json.dumps(["USER_INTERACTION"])
self.recipient.additional_json_data["email_notification_categories"] = (
json.dumps(["USER_INTERACTION"])
)
self.recipient.save()
verb = "Anne hat deinen Auftrag bewertet"
@ -146,9 +146,9 @@ class TestNotificationService(TestCase):
self.assertFalse(notification.emailed)
# when the email was not sent, yet it will still send it afterwards...
self.recipient.additional_json_data[
"email_notification_categories"
] = json.dumps(["USER_INTERACTION"])
self.recipient.additional_json_data["email_notification_categories"] = (
json.dumps(["USER_INTERACTION"])
)
self.recipient.save()
result = self.notification_service._send_notification(
@ -188,9 +188,9 @@ class TestNotificationService(TestCase):
self.assertFalse(self._has_sent_emails())
# Assert mail is sent if corresponding email notification type is enabled
self.recipient.additional_json_data[
"email_notification_categories"
] = json.dumps(["USER_INTERACTION"])
self.recipient.additional_json_data["email_notification_categories"] = (
json.dumps(["USER_INTERACTION"])
)
self.recipient.save()
self.notification_service._send_notification(
sender=self.sender,

View File

@ -39,9 +39,9 @@ class SelfEvaluationFeedbackSerializer(serializers.ModelSerializer):
return obj.learning_unit.get_circle().title
def get_criteria(self, obj):
performance_criteria: List[
PerformanceCriteria
] = obj.learning_unit.performancecriteria_set.all()
performance_criteria: List[PerformanceCriteria] = (
obj.learning_unit.performancecriteria_set.all()
)
criteria = []

View File

@ -67,15 +67,15 @@ class AbacusInvoiceCreator(InvoiceCreator):
)
SubElement(sales_order_header_fields, "CustomerNumber").text = customer_number
SubElement(
sales_order_header_fields, "PurchaseOrderDate"
).text = order_date.isoformat()
SubElement(
sales_order_header_fields, "DeliveryDate"
).text = order_date.isoformat()
SubElement(
sales_order_header_fields, "ReferencePurchaseOrder"
).text = reference_purchase_order
SubElement(sales_order_header_fields, "PurchaseOrderDate").text = (
order_date.isoformat()
)
SubElement(sales_order_header_fields, "DeliveryDate").text = (
order_date.isoformat()
)
SubElement(sales_order_header_fields, "ReferencePurchaseOrder").text = (
reference_purchase_order
)
SubElement(sales_order_header_fields, "UnicId").text = unic_id
for index, item in enumerate(items, start=1):