vbv/server/vbv_lernwelt/feedback/services.py

208 lines
6.6 KiB
Python

from datetime import datetime
from io import BytesIO
from itertools import groupby
from operator import attrgetter
from typing import Union
import structlog
from django.http import HttpResponse
from openpyxl import Workbook
from vbv_lernwelt.core.models import User
from vbv_lernwelt.course.models import CourseCompletionStatus, CourseSession
from vbv_lernwelt.course.services import mark_course_completion
from vbv_lernwelt.feedback.models import FeedbackResponse
from vbv_lernwelt.learnpath.models import (
LearningContentFeedbackUK,
LearningContentFeedbackVV,
)
logger = structlog.get_logger(__name__)
VV_FEEDBACK_QUESTIONS = [
("satisfaction", "Zufriedenheit insgesamt"),
("goal_attainment", "Zielerreichung insgesamt"),
(
"proficiency",
"Wie beurteilst du deine Sicherheit bezüglichen den Themen nach dem Circle?",
),
("preparation_task_clarity", "Waren die Praxisaufträge klar und verständlich?"),
("would_recommend", "Würdest du den Circle weiterempfehlen?"),
("course_positive_feedback", "Was hat dir besonders gut gefallen?"),
("course_negative_feedback", "Wo siehst du Verbesserungspotential?"),
]
def update_feedback_response(
feedback_user: User,
course_session: CourseSession,
learning_content_feedback_page: Union[
LearningContentFeedbackUK, LearningContentFeedbackVV
],
submitted: bool,
validated_data: dict,
):
circle = learning_content_feedback_page.get_circle()
feedback_response, _ = FeedbackResponse.objects.get_or_create(
feedback_user_id=feedback_user.id,
circle_id=circle.id,
course_session=course_session,
)
original_data = feedback_response.data
updated_data = validated_data
initial_data = initial_data_for_feedback_page(learning_content_feedback_page)
merged_data = initial_data | {
key: updated_data[key]
if updated_data.get(key, "") != ""
else original_data.get(key)
for key in initial_data.keys()
}
feedback_response.data = merged_data
# save the response before completion mark,
# because who knows what could happen in between...
if submitted:
feedback_response.submitted = submitted
feedback_response.save()
if submitted:
mark_course_completion(
user=feedback_user,
page=learning_content_feedback_page,
course_session=course_session,
completion_status=CourseCompletionStatus.SUCCESS.value,
)
logger.info(
"feedback successfully created",
label="feedback",
feedback_user_id=feedback_user.id,
circle_title=circle.title,
course_session_id=course_session.id,
)
return feedback_response
def initial_data_for_feedback_page(
learning_content_feedback_page: Union[
LearningContentFeedbackUK, LearningContentFeedbackVV
]
):
if hasattr(learning_content_feedback_page, "learningcontentfeedbackuk"):
return {
"satisfaction": None,
"goal_attainment": None,
"proficiency": None,
"preparation_task_clarity": None,
"instructor_competence": None,
"instructor_respect": None,
"instructor_open_feedback": "",
"would_recommend": None,
"course_negative_feedback": "",
"course_positive_feedback": "",
"feedback_type": "uk",
}
if hasattr(learning_content_feedback_page, "learningcontentfeedbackvv"):
return {
"satisfaction": None,
"goal_attainment": None,
"proficiency": None,
"preparation_task_clarity": None,
"would_recommend": None,
"course_negative_feedback": "",
"course_positive_feedback": "",
"feedback_type": "vv",
}
return {}
def export_feedback(course_session_ids: list[str], save_as_file: bool):
wb = Workbook()
# remove the first sheet is just easier than keeping track of the active sheet
wb.remove_sheet(wb.active)
feedbacks = FeedbackResponse.objects.filter(
course_session_id__in=course_session_ids,
submitted=True,
).order_by("circle", "updated_at")
grouped_feedbacks = groupby(feedbacks, key=attrgetter("circle"))
for circle, group_feedbacks in grouped_feedbacks:
group_feedbacks = list(group_feedbacks)
logger.debug(
"export_feedback_for_circle",
data={
"circle": circle.id,
"course_session_ids": course_session_ids,
"count": len(group_feedbacks),
},
label="feedback_export",
)
_create_sheet(wb, circle.title, group_feedbacks)
if save_as_file:
wb.save(make_export_filename())
else:
output = BytesIO()
wb.save(output)
output.seek(0)
return output.getvalue()
def _create_sheet(wb: Workbook, title: str, data: list[FeedbackResponse]):
sheet = wb.create_sheet(title=title)
# add header
sheet.cell(row=1, column=1, value="Datum")
questions = [q[1] for q in VV_FEEDBACK_QUESTIONS]
for col_idx, title in enumerate(questions, start=2):
sheet.cell(row=1, column=col_idx, value=title)
_add_rows(sheet, data)
return sheet
def _add_rows(sheet, data):
for row_idx, feedback in enumerate(data, start=2):
sheet.cell(
row=row_idx, column=1, value=feedback.updated_at.date().strftime("%d.%m.%Y")
)
for col_idx, question in enumerate(VV_FEEDBACK_QUESTIONS, start=2):
response = feedback.data.get(question[0], "")
sheet.cell(row=row_idx, column=col_idx, value=response)
def make_export_filename(name: str = "feedback_export"):
today_date = datetime.today().strftime("%Y-%m-%d")
return f"{name}_{today_date}.xlsx"
# used as admin action, that's why it's not in the views.py
def get_feedbacks_for_course_sessions(_modeladmin, _request, queryset):
file_name = "feedback_export_durchfuehrungen"
return _handle_feedback_export_action(queryset, file_name)
def get_feedbacks_for_courses(_modeladmin, _request, queryset):
course_sessions = CourseSession.objects.filter(course__in=queryset)
file_name = "feedback_export_lehrgaenge"
return _handle_feedback_export_action(course_sessions, file_name)
def _handle_feedback_export_action(course_seesions, file_name):
excel_bytes = export_feedback(course_seesions, False)
response = HttpResponse(
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
response["Content-Disposition"] = f"attachment; filename={make_export_filename(file_name)}"
response.write(excel_bytes)
return response