Merged in feature/VBV-378-import (pull request #121)

Feature/VBV-378 import

Approved-by: Christian Cueni
This commit is contained in:
Daniel Egger 2023-06-02 15:49:33 +00:00
commit af9fcc0eb1
38 changed files with 1144 additions and 88 deletions

8
.gitignore vendored
View File

@ -61,8 +61,6 @@ target/
# pyenv
.python-version
# Environments
.venv
venv/
@ -76,7 +74,6 @@ venv/
# mypy
.mypy_cache/
### Node template
# Logs
logs
@ -159,10 +156,6 @@ typings/
# Local History for Visual Studio Code
.history/
### Windows template
# Windows thumbnail cache files
Thumbs.db
@ -272,6 +265,7 @@ tags
### Project template
.~lock.*
.pytest_cache/
.ipython/
vendors.js

View File

@ -26,14 +26,14 @@ function generate_default_app_name() {
APP_NAME=${1:-$(generate_default_app_name)}
# VITE_* variables need to be present at build time
VITE_APP_ENVIRONMENT="dev-$APP_NAME"
export VITE_APP_ENVIRONMENT="dev-$APP_NAME"
if [[ "$APP_NAME" == "myvbv-stage" ]]; then
VITE_OAUTH_API_BASE_URL="https://vbvtst.b2clogin.com/vbvtst.onmicrosoft.com/b2c_1_signupandsignin/oauth2/v2.0/"
VITE_APP_ENVIRONMENT="stage-caprover"
export VITE_OAUTH_API_BASE_URL="https://vbvtst.b2clogin.com/vbvtst.onmicrosoft.com/b2c_1_signupandsignin/oauth2/v2.0/"
export VITE_APP_ENVIRONMENT="stage-caprover"
elif [[ "$APP_NAME" == prod* ]]; then
VITE_OAUTH_API_BASE_URL="https://edumgr.b2clogin.com/edumgr.onmicrosoft.com/b2c_1_signupandsignin/oauth2/v2.0/"
VITE_APP_ENVIRONMENT=$APP_NAME
export VITE_OAUTH_API_BASE_URL="https://edumgr.b2clogin.com/edumgr.onmicrosoft.com/b2c_1_signupandsignin/oauth2/v2.0/"
export VITE_APP_ENVIRONMENT=$APP_NAME
fi
echo "Deploy to $APP_NAME"

View File

@ -21,6 +21,7 @@ if (appEnv.startsWith("prod")) {
} else {
log.setLevel("trace");
}
log.warn(`application started appEnv=${appEnv}`);
const i18n = setupI18n();
const app = createApp(App);

View File

@ -7,7 +7,10 @@
<div class="mb-12 grid grid-cols-icon-card gap-x-4 grid-areas-icon-card">
<it-icon-location class="w-[60px] grid-in-icon" />
<h2 class="text-large font-bold grid-in-title">Standort</h2>
<p class="grid-in-value">{{ location }}</p>
<p v-if="location.startsWith('https://')" class="grid-in-value">
<a class="link" target="_blank" :href="location">{{ location }}</a>
</p>
<p v-else class="grid-in-value">{{ location }}</p>
</div>
<div class="grid grid-cols-icon-card content-between gap-x-4 grid-areas-icon-card">
<it-icon-trainer class="w-[60px] grid-in-icon" />

View File

@ -140,9 +140,7 @@ AUTH_USER_MODEL = "core.User"
# https://docs.djangoproject.com/en/dev/ref/settings/#login-redirect-url
# https://docs.djangoproject.com/en/dev/ref/settings/#login-url
# FIXME make configurable!?
# LOGIN_URL = "/sso/login/"
LOGIN_URL = "/login"
LOGIN_URL = "/login-local"
LOGIN_REDIRECT_URL = "/"
ALLOW_LOCAL_LOGIN = env.bool("IT_ALLOW_LOCAL_LOGIN", default=DEBUG)

View File

@ -319,7 +319,9 @@ mypy-extensions==0.4.3
nodeenv==1.6.0
# via pre-commit
openpyxl==3.1.2
# via wagtail
# via
# -r requirements.in
# wagtail
packaging==21.3
# via
# build
@ -405,6 +407,7 @@ pytest-sugar==0.9.4
# via -r requirements-dev.in
python-dateutil==2.8.2
# via
# -r requirements.in
# botocore
# faker
python-dotenv==0.20.0

View File

@ -37,6 +37,7 @@ sendgrid
structlog
python-json-logger
concurrent-log-handler
python-dateutil
wagtail>=4
wagtail-factories>=4
@ -47,3 +48,4 @@ azure-storage-blob
azure-identity
boto3
openpyxl

View File

@ -190,7 +190,9 @@ msal==1.22.0
msal-extensions==1.0.0
# via azure-identity
openpyxl==3.1.2
# via wagtail
# via
# -r requirements.in
# wagtail
packaging==21.3
# via
# marshmallow
@ -219,6 +221,7 @@ pyrsistent==0.18.1
# via jsonschema
python-dateutil==2.8.2
# via
# -r requirements.in
# botocore
# faker
python-dotenv==0.20.0

View File

@ -1,25 +0,0 @@
from django.contrib.auth.base_user import BaseUserManager
from django.contrib.auth.models import AbstractUser
class UserManager(BaseUserManager):
def create_or_update_by_email(self, user_dict: dict) -> tuple[AbstractUser, bool]:
# create or sync user with OpenID Data
user, created = self.model.objects.get_or_create(
sso_id=user_dict["oid"],
defaults={
"email": user_dict["email"],
"username": user_dict["email"],
"first_name": user_dict["first_name"],
"last_name": user_dict["last_name"],
},
)
if not created:
user.email = user_dict["email"]
user.username = user_dict["email"]
user.first_name = user_dict["first_name"]
user.last_name = user_dict["last_name"]
user.save()
return user, created

View File

@ -0,0 +1,20 @@
# Generated by Django 3.2.13 on 2023-05-31 14:34
import django.contrib.auth.models
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("core", "0001_initial"),
]
operations = [
migrations.AlterModelManagers(
name="user",
managers=[
("objects", django.contrib.auth.models.UserManager()),
],
),
]

View File

@ -0,0 +1,22 @@
# Generated by Django 3.2.13 on 2023-06-02 12:32
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("core", "0002_alter_user_managers"),
]
operations = [
migrations.AlterField(
model_name="user",
name="avatar_url",
field=models.CharField(
blank=True,
default="/static/avatars/myvbv-default-avatar.png",
max_length=254,
),
),
]

View File

@ -2,8 +2,6 @@ from django.contrib.auth.models import AbstractUser
from django.db import models
from django.db.models import JSONField
from vbv_lernwelt.core.managers import UserManager
class User(AbstractUser):
"""
@ -19,7 +17,9 @@ class User(AbstractUser):
# FIXME: look into it...
# objects = UserManager()
avatar_url = models.CharField(max_length=254, blank=True, default="")
avatar_url = models.CharField(
max_length=254, blank=True, default="/static/avatars/myvbv-default-avatar.png"
)
email = models.EmailField("email address", unique=True)
sso_id = models.UUIDField(
"SSO subscriber ID", unique=True, null=True, blank=True, default=None
@ -27,8 +27,6 @@ class User(AbstractUser):
additional_json_data = JSONField(default=dict, blank=True)
language = models.CharField(max_length=2, choices=LANGUAGE_CHOICES, default="de")
objects = UserManager()
class SecurityRequestResponseLog(models.Model):
label = models.CharField(max_length=255, blank=True, default="")

View File

@ -20,6 +20,7 @@ class CourseSessionAdmin(admin.ModelAdmin):
list_display = [
"title",
"course",
"import_id",
"start_date",
"end_date",
"created_at",
@ -31,8 +32,9 @@ class CourseSessionAdmin(admin.ModelAdmin):
class CourseSessionUserAdmin(admin.ModelAdmin):
date_hierarchy = "created_at"
list_display = [
"course_session",
"user",
"course_session",
"role",
"created_at",
"updated_at",
]
@ -43,12 +45,12 @@ class CourseSessionUserAdmin(admin.ModelAdmin):
"course_session__title",
]
list_filter = [
"course_session__course",
"course_session",
"role",
]
fieldsets = [
(None, {"fields": ("user", "course_session")}),
(None, {"fields": ("user", "course_session", "role")}),
(
"Expert/Trainer",
{

View File

@ -1,3 +1,4 @@
import os
import random
import djclick as click
@ -43,9 +44,19 @@ from vbv_lernwelt.course.creators.uk_training_course import (
from vbv_lernwelt.course.creators.versicherungsvermittlerin import (
create_versicherungsvermittlerin_with_categories,
)
from vbv_lernwelt.course.models import CoursePage, CourseSession, CourseSessionUser
from vbv_lernwelt.course.models import (
Course,
CoursePage,
CourseSession,
CourseSessionUser,
)
from vbv_lernwelt.course.services import mark_course_completion
from vbv_lernwelt.feedback.creators.create_demo_feedback import create_feedback
from vbv_lernwelt.importer.services import (
import_course_sessions_from_excel,
import_students_from_excel,
import_trainers_from_excel,
)
from vbv_lernwelt.learnpath.create_vv_new_learning_path import (
create_vv_new_learning_path,
)
@ -439,21 +450,24 @@ def create_course_training_de():
create_uk_training_competence_profile(course_id=COURSE_UK_TRAINING)
create_default_media_library(course_id=COURSE_UK_TRAINING)
cs = CourseSession.objects.create(
course_id=COURSE_UK_TRAINING,
title="Demo-Tag",
attendance_courses=[
{
"learningContentId": LearningContentAttendanceCourse.objects.get(
slug=f"{course.slug}-lp-circle-fahrzeug-lc-präsenzkurs-fahrzeug"
).id,
"start": "2023-05-23T08:30:00+0200",
"end": "2023-05-23T17:00:00+0200",
"location": "Handelsschule KV Bern, Zimmer 123, Eigerstrasse 16, 3012 Bern",
"trainer": "Roland Grossenbacher, roland.grossenbacher@helvetia.ch",
}
],
assignment_details_list=[
current_dir = os.path.dirname(os.path.realpath(__file__))
print(current_dir)
course = Course.objects.get(id=COURSE_UK_TRAINING)
import_course_sessions_from_excel(
course,
f"{current_dir}/../../../importer/tests/Schulungen_Durchfuehrung_Trainer.xlsx",
)
import_trainers_from_excel(
course,
f"{current_dir}/../../../importer/tests/Schulungen_Durchfuehrung_Trainer.xlsx",
)
import_students_from_excel(
course,
f"{current_dir}/../../../importer/tests/Schulungen_Teilnehmende.xlsx",
)
for cs in CourseSession.objects.filter(course_id=COURSE_UK_TRAINING):
cs.assignment_details_list = [
{
"learningContentId": LearningContentAssignment.objects.get(
slug=f"{course.slug}-lp-circle-fahrzeug-lc-überprüfen-einer-motorfahrzeug-versicherungspolice"
@ -468,5 +482,34 @@ def create_course_training_de():
"submissionDeadlineDateTimeUtc": "2023-06-13T19:00:00Z",
"evaluationDeadlineDateTimeUtc": "2023-06-27T19:00:00Z",
},
],
)
]
cs.save()
# attach users as trainers to ÜK course
course_uk = Course.objects.filter(id=COURSE_UK).first()
if course_uk:
users = [
csu.user
for csu in CourseSessionUser.objects.filter(
course_session__course_id=COURSE_UK_TRAINING
)
]
cs = CourseSession.objects.get(course_id=COURSE_UK, title="Bern 2023 a")
for user in users:
csu, _created = CourseSessionUser.objects.get_or_create(
course_session_id=cs.id, user_id=user.id
)
csu.role = CourseSessionUser.Role.EXPERT
csu.expert.add(
Circle.objects.get(slug="überbetriebliche-kurse-lp-circle-kickoff")
)
csu.expert.add(
Circle.objects.get(slug="überbetriebliche-kurse-lp-circle-basis")
)
csu.expert.add(
Circle.objects.get(slug="überbetriebliche-kurse-lp-circle-fahrzeug")
)
csu.save()

View File

@ -0,0 +1,38 @@
# Generated by Django 3.2.13 on 2023-05-31 15:59
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("course", "0003_rename_attendance_days_coursesession_attendance_courses"),
]
operations = [
migrations.AddField(
model_name="coursesession",
name="generation",
field=models.TextField(blank=True, default=""),
),
migrations.AddField(
model_name="coursesession",
name="group",
field=models.TextField(blank=True, default=""),
),
migrations.AddField(
model_name="coursesession",
name="import_id",
field=models.TextField(blank=True, default=""),
),
migrations.AddField(
model_name="coursesession",
name="region",
field=models.TextField(blank=True, default=""),
),
migrations.AlterField(
model_name="coursesession",
name="title",
field=models.TextField(unique=True),
),
]

View File

@ -215,7 +215,13 @@ class CourseSession(models.Model):
updated_at = models.DateTimeField(auto_now=True)
course = models.ForeignKey("course.Course", on_delete=models.CASCADE)
title = models.TextField()
title = models.TextField(unique=True)
import_id = models.TextField(blank=True, default="")
generation = models.TextField(blank=True, default="")
region = models.TextField(blank=True, default="")
group = models.TextField(blank=True, default="")
start_date = models.DateField(null=True, blank=True)
end_date = models.DateField(null=True, blank=True)

View File

View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class SsoConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "vbv_lernwelt.importer"

View File

View File

@ -0,0 +1,223 @@
from typing import Any, Dict
import structlog
from openpyxl.reader.excel import load_workbook
from vbv_lernwelt.core.models import User
from vbv_lernwelt.course.models import Course, CourseSession, CourseSessionUser
from vbv_lernwelt.importer.utils import (
calc_header_tuple_list_from_pyxl_sheet,
parse_circle_group_string,
try_parse_datetime,
)
from vbv_lernwelt.learnpath.models import Circle, LearningContentAttendanceCourse
logger = structlog.get_logger(__name__)
def create_or_update_user(
email: str, first_name: str = "", last_name: str = "", sso_id: str = None
):
logger.debug(
"create_or_update_user",
email=email,
first_name=first_name,
last_name=last_name,
sso_id=sso_id,
label="import",
)
user = None
if sso_id:
user_qs = User.objects.filter(sso_id=sso_id)
if user_qs.exists():
user = user_qs.first()
if not user:
user_qs = User.objects.filter(email=email)
if user_qs.exists():
user = user_qs.first()
if not user:
# create user
user = User(sso_id=sso_id, email=email, username=email)
user.email = email
user.sso_id = user.sso_id or sso_id
user.first_name = first_name or user.first_name
user.last_name = last_name or user.last_name
user.username = email
user.set_unusable_password()
user.save()
return user
def import_course_sessions_from_excel(course: Course, filename: str):
workbook = load_workbook(filename=filename)
sheet = workbook["Schulungen Durchführung"]
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
for row in tuple_list:
create_or_update_course_session(course, dict(row), circles=["Fahrzeug"])
def create_or_update_course_session(course: Course, data: Dict[str, Any], circles=None):
"""
:param data: the following keys are required to process the data: Generation, Region, Klasse
:return:
"""
logger.debug(
"create_or_update_course_session",
course=course.title,
data=data,
label="import",
)
if circles is None:
circles = []
# TODO: validation
group = data["Klasse"].strip()
import_id = data["ID"].strip()
generation = str(data["Generation"]).strip()
region = data["Region"].strip()
title = f"{region} {generation} {group}"
cs, _created = CourseSession.objects.get_or_create(
import_id=import_id, group=group, course=course
)
cs.additional_json_data["import_data"] = data
cs.save()
cs.title = title
cs.generation = generation
cs.region = region
cs.group = group
cs.import_id = import_id
cs.save()
for circle in circles:
attendance_course_lp_qs = LearningContentAttendanceCourse.objects.filter(
slug=f"{course.slug}-lp-circle-{circle.lower()}-lc-präsenzkurs-{circle.lower()}"
)
if attendance_course_lp_qs.exists():
cs.attendance_courses.append(
{
"learningContentId": attendance_course_lp_qs.first().id,
"start": try_parse_datetime(data[f"{circle} Start"])[1].isoformat(),
"end": try_parse_datetime(data[f"{circle} Ende"])[1].isoformat(),
"location": data[f"{circle} Raum"],
"trainer": "",
}
)
cs.save()
return cs
def import_trainers_from_excel(course: Course, filename: str):
workbook = load_workbook(filename=filename)
sheet = workbook["Schulungen Trainer"]
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
for row in tuple_list:
create_or_update_trainer(course, dict(row))
def create_or_update_trainer(course: Course, data: Dict[str, Any]):
logger.debug(
"create_or_update_trainer",
course=course.title,
data=data,
label="import",
)
user = create_or_update_user(
email=data["Email"],
first_name=data["Vorname"],
last_name=data["Name"],
)
# TODO: handle language
groups = [g.strip() for g in data["Klasse"].strip().split(",")]
# general expert handling
for group in groups:
import_id = f"{data['Generation'].strip()} {group}"
course_session = CourseSession.objects.filter(
import_id=import_id, group=group, course=course
).first()
if course_session:
csu, _created = CourseSessionUser.objects.get_or_create(
course_session_id=course_session.id, user_id=user.id
)
csu.role = CourseSessionUser.Role.EXPERT
csu.save()
# circle expert handling
circle_data = parse_circle_group_string(data["Circles"])
for circle_string in circle_data:
parts = circle_string.split("(", 1)
circle_name = parts[0].strip()
groups = [g.strip() for g in parts[1].rstrip(")").strip().split(",")]
# print(circle_name, groups)
for group in groups:
course_session = CourseSession.objects.filter(
import_id=import_id, group=group, course=course
).first()
circle = Circle.objects.filter(
slug=f"{course.slug}-lp-circle-{circle_name.lower()}"
).first()
if course_session and circle:
csu = CourseSessionUser.objects.filter(
course_session_id=course_session.id, user_id=user.id
).first()
if csu:
csu.expert.add(circle)
csu.save()
def import_students_from_excel(course: Course, filename: str):
workbook = load_workbook(filename=filename)
sheet = workbook.active
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
for row in tuple_list:
create_or_update_student(course, dict(row))
def create_or_update_student(course: Course, data: Dict[str, Any]):
logger.debug(
"create_or_update_student",
course=course.title,
data=data,
label="import",
)
user = create_or_update_user(
email=data["Email"],
first_name=data["Vorname"],
last_name=data["Name"],
)
# TODO: handle language
# general expert handling
import_ids = [i.strip() for i in data["Durchführungen"].split(",")]
for import_id in import_ids:
course_session = CourseSession.objects.filter(
import_id=import_id, course=course
).first()
if course_session:
csu, _created = CourseSessionUser.objects.get_or_create(
course_session_id=course_session.id, user_id=user.id
)
csu.save()

View File

@ -0,0 +1,129 @@
import os
from django.test import TestCase
from openpyxl.reader.excel import load_workbook
from vbv_lernwelt.course.creators.test_course import create_test_course
from vbv_lernwelt.course.models import CourseSession
from vbv_lernwelt.importer.services import create_or_update_course_session
from vbv_lernwelt.importer.utils import calc_header_tuple_list_from_pyxl_sheet
test_dir = os.path.dirname(os.path.abspath(__file__))
class ImportCourseSessionTestCase(TestCase):
def setUp(self):
self.course = create_test_course(include_vv=False)
def test_import_excel_file(self):
workbook = load_workbook(
filename=f"{test_dir}/Schulungen_Durchfuehrung_Trainer.xlsx"
)
sheet = workbook["Schulungen Durchführung"]
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
for row in tuple_list:
print(row)
create_or_update_course_session(
self.course, dict(row), circles=["Fahrzeug"]
)
self.assertEqual(CourseSession.objects.count(), 6)
class CreateOrUpdateCourseSessionTestCase(TestCase):
def setUp(self):
self.course = create_test_course(include_vv=False)
def test_create_course_session(self):
row = [
("ID", "DE 2023 A"),
("Generation", 2023),
("Region", "Deutschschweiz"),
("Klasse", "A"),
("Fahrzeug Start", "06.06.2023, 13:30"),
("Fahrzeug Ende", "06.06.2023, 15:00"),
(
"Fahrzeug Raum",
"https://teams.microsoft.com/l/meetup-join/19%3ameeting_N2I5YzViZTQtYTM2Ny00OTYwLTgzNzAtYWI4OTQzODcxNTlj%40thread.v2/0?context=%7b%22Tid%22%3a%22fedd03c8-a756-4803-8f27-0db8f7c488f2%22%2c%22Oid%22%3a%22f92e6382-3884-4e71-a2fd-b305a75d9812%22%7d",
),
("Fahrzeug Standort", None),
("Fahrzeug Adresse", None),
]
data = dict(row)
cs = create_or_update_course_session(self.course, data, circles=["Fahrzeug"])
self.assertEqual(cs.import_id, "DE 2023 A")
self.assertEqual(cs.title, "Deutschschweiz 2023 A")
self.assertEqual(cs.generation, "2023")
self.assertEqual(cs.region, "Deutschschweiz")
self.assertEqual(cs.group, "A")
attendance_course = cs.attendance_courses[0]
attendance_course = {
k: v
for k, v in attendance_course.items()
if k not in ["learningContentId", "location"]
}
self.assertDictEqual(
attendance_course,
{
"start": "2023-06-06T13:30:00",
"end": "2023-06-06T15:00:00",
"trainer": "",
},
)
def test_update_course_session(self):
cs = CourseSession.objects.create(
course_id=self.course.id,
title="Deutschschweiz 2023 A",
import_id="DE 2023",
group="A",
)
row = [
("ID", "DE 2023"),
("Generation", 2023),
("Region", "Deutschschweiz"),
("Klasse", "A"),
("Fahrzeug Start", "06.06.2023, 13:30"),
("Fahrzeug Ende", "06.06.2023, 15:00"),
(
"Fahrzeug Raum",
"https://teams.microsoft.com/l/meetup-join/19%3ameeting_N2I5YzViZTQtYTM2Ny00OTYwLTgzNzAtYWI4OTQzODcxNTlj%40thread.v2/0?context=%7b%22Tid%22%3a%22fedd03c8-a756-4803-8f27-0db8f7c488f2%22%2c%22Oid%22%3a%22f92e6382-3884-4e71-a2fd-b305a75d9812%22%7d",
),
("Fahrzeug Standort", None),
("Fahrzeug Adresse", None),
]
data = dict(row)
cs = create_or_update_course_session(self.course, data, circles=["Fahrzeug"])
self.assertEqual(1, CourseSession.objects.count())
self.assertEqual(cs.import_id, "DE 2023")
self.assertEqual(cs.title, "Deutschschweiz 2023 A")
self.assertEqual(cs.generation, "2023")
self.assertEqual(cs.region, "Deutschschweiz")
self.assertEqual(cs.group, "A")
attendance_course = cs.attendance_courses[0]
attendance_course = {
k: v
for k, v in attendance_course.items()
if k not in ["learningContentId", "location"]
}
self.assertDictEqual(
attendance_course,
{
"start": "2023-06-06T13:30:00",
"end": "2023-06-06T15:00:00",
"trainer": "",
},
)

View File

@ -0,0 +1,76 @@
import os
from datetime import datetime
from django.test import TestCase
from openpyxl.reader.excel import load_workbook
from vbv_lernwelt.course.creators.test_course import create_test_course
from vbv_lernwelt.course.models import CourseSession, CourseSessionUser
from vbv_lernwelt.importer.services import create_or_update_student
from vbv_lernwelt.importer.utils import calc_header_tuple_list_from_pyxl_sheet
test_dir = os.path.dirname(os.path.abspath(__file__))
class ImportStudentsTestCase(TestCase):
def setUp(self):
self.course = create_test_course(include_vv=False)
self.course_session_a = CourseSession.objects.create(
course=self.course,
title="Deutschschweiz 2023 A",
import_id="DE 2023 A",
group="A",
)
def test_import_excel_file(self):
workbook = load_workbook(filename=f"{test_dir}/Schulungen_Teilnehmende.xlsx")
sheet = workbook.active
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
for row in tuple_list:
print(row)
create_or_update_student(self.course, dict(row))
self.assertEqual(CourseSessionUser.objects.count(), 28)
class CreateOrUpdateStudentTestCase(TestCase):
def setUp(self):
self.course = create_test_course(include_vv=False)
self.course_session_a = CourseSession.objects.create(
course=self.course,
title="Deutschschweiz 2023 A",
import_id="DE 2023 A",
group="A",
)
def test_create_student(self):
row = [
("Name", "Rascher"),
("Vorname", "Barbara"),
("Email", "barbara.rascher@vbv-afa.ch"),
("Sprache", "de"),
("Durchführungen", "DE 2023 A"),
("Datum", datetime(2023, 9, 6, 0, 0)),
(None, "VBV"),
(None, None),
(None, None),
(None, None),
(None, None),
]
create_or_update_student(self.course, dict(row))
self.assertEqual(
CourseSessionUser.objects.filter(
user__email="barbara.rascher@vbv-afa.ch"
).count(),
1,
)
csu = CourseSessionUser.objects.get(
course_session=self.course_session_a,
)
self.assertEqual(csu.role, CourseSessionUser.Role.MEMBER)
self.assertEqual(csu.user.email, "barbara.rascher@vbv-afa.ch")

View File

@ -0,0 +1,88 @@
import os
from django.test import TestCase
from openpyxl.reader.excel import load_workbook
from vbv_lernwelt.course.creators.test_course import create_test_course
from vbv_lernwelt.course.models import CourseSession, CourseSessionUser
from vbv_lernwelt.importer.services import create_or_update_trainer
from vbv_lernwelt.importer.utils import calc_header_tuple_list_from_pyxl_sheet
test_dir = os.path.dirname(os.path.abspath(__file__))
class ImportTrainerTestCase(TestCase):
def setUp(self):
self.course = create_test_course(include_vv=False)
self.course_session_a = CourseSession.objects.create(
course=self.course,
title="Deutschschweiz 2023 A",
import_id="DE 2023 A",
group="A",
)
self.course_session_a = CourseSession.objects.create(
course=self.course,
title="Deutschschweiz 2023 B",
import_id="DE 2023 B",
group="B",
)
def test_import_excel_file(self):
workbook = load_workbook(
filename=f"{test_dir}/Schulungen_Durchfuehrung_Trainer.xlsx"
)
sheet = workbook["Schulungen Trainer"]
tuple_list = calc_header_tuple_list_from_pyxl_sheet(sheet)
for row in tuple_list:
print(row)
create_or_update_trainer(self.course, dict(row))
self.assertEqual(CourseSessionUser.objects.count(), 4)
class CreateOrUpdateTrainerTestCase(TestCase):
def setUp(self):
self.course = create_test_course(include_vv=False)
self.course_session_a = CourseSession.objects.create(
course=self.course,
title="Deutschschweiz 2023 A",
import_id="DE 2023 A",
group="A",
)
self.course_session_a = CourseSession.objects.create(
course=self.course,
title="Deutschschweiz 2023 B",
import_id="DE 2023 B",
group="B",
)
def test_create_trainer(self):
row = [
("Name", "Hänni"),
("Vorname", "Fabienne"),
("Email", "fabienne.haenni@vbv-afa.ch"),
("Sprache", "de"),
("Generation", "DE 2023"),
("Klasse", "A, B"),
("Circles", "Fahrzeug (A, B), Reisen (A), KMU (B)"),
("Status Referenten", "ok"),
(None, "Schulung D"),
]
create_or_update_trainer(self.course, dict(row))
self.assertEqual(
CourseSessionUser.objects.filter(
user__email="fabienne.haenni@vbv-afa.ch"
).count(),
2,
)
csu = CourseSessionUser.objects.get(
course_session=self.course_session_a,
)
self.assertEqual(csu.role, CourseSessionUser.Role.EXPERT)
self.assertEqual(csu.user.email, "fabienne.haenni@vbv-afa.ch")
self.assertEqual(csu.expert.all().first().title, "Fahrzeug")

View File

@ -0,0 +1,94 @@
from django.test import TestCase
from vbv_lernwelt.core.models import User
from vbv_lernwelt.importer.services import create_or_update_user
class CreateOrUpdateUserTestCase(TestCase):
def test_create_user(self):
u = create_or_update_user(
email="daniel@example.com",
first_name="Daniel",
last_name="Egger",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
saved_user = User.objects.get(id=u.id)
self.assertEqual(saved_user.email, "daniel@example.com")
self.assertEqual(saved_user.username, "daniel@example.com")
self.assertEqual(saved_user.first_name, "Daniel")
self.assertEqual(saved_user.last_name, "Egger")
self.assertEqual(str(saved_user.sso_id), "12229620-81ea-483d-8d96-6ba8be5f9eb7")
def test_update_existing_user_with_oid(self):
User.objects.create(
email="daniel@example.com",
username="daniel@example.com",
first_name="Daniel",
last_name="Egger",
)
create_or_update_user(
email="daniel@example.com",
last_name="Egger",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
self.assertEqual(1, User.objects.count())
user = User.objects.first()
self.assertEqual(user.email, "daniel@example.com")
self.assertEqual(user.username, "daniel@example.com")
self.assertEqual(user.first_name, "Daniel")
self.assertEqual(user.last_name, "Egger")
self.assertEqual(str(user.sso_id), "12229620-81ea-483d-8d96-6ba8be5f9eb7")
def test_update_existing_user_with_new_last_name(self):
User.objects.create(
email="daniel@example.com",
username="daniel@example.com",
first_name="Daniel",
last_name="Egger",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
create_or_update_user(
email="daniel@example.com",
first_name="Daniel",
last_name="Marro",
)
self.assertEqual(1, User.objects.count())
user = User.objects.first()
self.assertEqual(user.email, "daniel@example.com")
self.assertEqual(user.username, "daniel@example.com")
self.assertEqual(user.first_name, "Daniel")
self.assertEqual(user.last_name, "Marro")
self.assertEqual(str(user.sso_id), "12229620-81ea-483d-8d96-6ba8be5f9eb7")
def test_update_existing_user_with_new_email(self):
User.objects.create(
email="daniel@example.com",
username="daniel@example.com",
first_name="Daniel",
last_name="Egger",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
create_or_update_user(
email="danu@example.com",
first_name="Daniel",
last_name="Egger",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
self.assertEqual(1, User.objects.count())
user = User.objects.first()
self.assertEqual(user.email, "danu@example.com")
self.assertEqual(user.username, "danu@example.com")
self.assertEqual(user.first_name, "Daniel")
self.assertEqual(user.last_name, "Egger")
self.assertEqual(str(user.sso_id), "12229620-81ea-483d-8d96-6ba8be5f9eb7")

View File

@ -0,0 +1,192 @@
from datetime import date, datetime
from unittest import TestCase
from vbv_lernwelt.importer.utils import (
parse_circle_group_string,
try_parse_date,
try_parse_datetime,
try_parse_int,
)
class TryParseDateTestCase(TestCase):
def test_wrongData_returnsFalseAndValue(self):
flag, value = try_parse_date("nonsense")
self.assertFalse(flag)
self.assertEqual("nonsense", value)
def test_isoDate_returnsCorrectDate(self):
flag, value = try_parse_date("2015-01-20")
self.assertTrue(flag)
self.assertEqual(date(2015, 1, 20), value)
def test_isoDateWithTime_returnsCorrectDate(self):
flag, value = try_parse_date("2015-01-20T15:21")
self.assertTrue(flag)
self.assertEqual(date(2015, 1, 20), value)
def test_isoDateWithTime2_returnsCorrectDate(self):
flag, value = try_parse_date("2018-05-03T00:00:00")
self.assertTrue(flag)
self.assertEqual(date(2018, 5, 3), value)
def test_swissDate_returnsCorrectDate(self):
flag, value = try_parse_date("01.05.2018")
self.assertTrue(flag)
self.assertEqual(date(2018, 5, 1), value)
def test_wrongIsoDate_returnsFalseAndValue(self):
flag, value = try_parse_date("2015-14-40")
self.assertFalse(flag)
self.assertEqual("2015-14-40", value)
def test_inputIsDate_returnsDate(self):
flag, value = try_parse_date(date(2016, 5, 1))
self.assertTrue(flag)
self.assertEqual(date(2016, 5, 1), value)
def test_inputIsNumber_returnsFalseAndNumber(self):
flag, value = try_parse_date(123)
self.assertFalse(flag)
self.assertEqual(123, value)
def test_inputIsNumberString_returnsFalseAndString(self):
flag, value = try_parse_date("56")
self.assertFalse(flag)
self.assertEqual("56", value)
def test_inputIsFloatString_returnsFalseAndString(self):
flag, value = try_parse_date("3.14")
self.assertFalse(flag)
self.assertEqual("3.14", value)
def test_inputIsShortDateWithoutYear_returnsFalseAndString(self):
flag, value = try_parse_date("11-01")
self.assertFalse(flag)
self.assertEqual("11-01", value)
class TryParseInt(TestCase):
def test_int_works(self):
flag, value = try_parse_int(123)
self.assertTrue(flag)
self.assertEqual(123, value)
def test_valid_string_works(self):
flag, value = try_parse_int("123")
self.assertTrue(flag)
self.assertEqual(123, value)
def test_invalid_string_breaks(self):
flag, value = try_parse_int("123qwer")
self.assertFalse(flag)
self.assertEqual("123qwer", value)
def test_invalid_string_returns_default(self):
flag, value = try_parse_int("123qwer", 0)
self.assertFalse(flag)
self.assertEqual(0, value)
class TryParseDateTimeTestCase(TestCase):
def test_isoDateTime_returnsCorrectDateTime(self):
flag, value = try_parse_datetime("2016-05-31T10:00:00.000000")
self.assertTrue(flag)
self.assertEqual(datetime(2016, 5, 31, 10, 0, 0), value)
def test_isoDateTimeWithoutSeconds_returnsCorrectDateTime(self):
flag, value = try_parse_datetime("2016-05-02T10:00")
self.assertTrue(flag)
self.assertEqual(datetime(2016, 5, 2, 10, 0, 0), value)
def test_isoDateWithoutTime_returnTrueAndDatetimeWithZeroHour(self):
flag, value = try_parse_datetime("2016-05-31")
self.assertTrue(flag)
self.assertEqual(datetime(2016, 5, 31), value)
def test_isoDateWithSpaceBeforeTime_returnTrueAndDatetimeWithZeroHour(self):
flag, value = try_parse_datetime("2016-05-03 14:12")
self.assertTrue(flag)
self.assertEqual(datetime(2016, 5, 3, 14, 12), value)
def test_swissDateWithTime_returnTrueAndDatetime(self):
flag, value = try_parse_datetime("01.05.2018 15:20")
self.assertTrue(flag)
self.assertEqual(datetime(2018, 5, 1, 15, 20), value)
def test_swissDateWithTimeWithMultipleSpaces_returnTrueAndDatetime(self):
flag, value = try_parse_datetime("01 .05. 2018 15:20")
self.assertTrue(flag)
self.assertEqual(datetime(2018, 5, 1, 15, 20), value)
def test_withDateTimeInput_returnsTrueAndDateTime(self):
flag, value = try_parse_datetime(datetime(2016, 5, 31, 10, 0))
self.assertTrue(flag)
self.assertEqual(datetime(2016, 5, 31, 10, 0, 0), value)
def test_inputIsNumber_returnsFalseAndNumber(self):
flag, value = try_parse_datetime(123)
self.assertFalse(flag)
self.assertEqual(123, value)
def test_inputIsNumberString_returnsFalseAndString(self):
flag, value = try_parse_datetime("56")
self.assertFalse(flag)
self.assertEqual("56", value)
def test_inputIsFloatString_returnsFalseAndString(self):
flag, value = try_parse_datetime("3.14")
self.assertFalse(flag)
self.assertEqual("3.14", value)
def test_inputIsShortDateWithoutYear_returnsFalseAndString(self):
flag, value = try_parse_date("11-01")
self.assertFalse(flag)
self.assertEqual("11-01", value)
def test_inputIsTimeString_returnsCurrentDateWithGivenTime(self):
flag, value = try_parse_datetime(" 15:00")
self.assertFalse(flag)
self.assertEqual(" 15:00", value)
def test_inputFromVbvExcel_returnsCurrentDateWithGivenTime(self):
flag, value = try_parse_datetime("09.06.2023, 13:30")
self.assertTrue(flag)
self.assertEqual(datetime(2023, 6, 9, 13, 30, 0), value)
class ParseCircleGroupStringTestCase(TestCase):
def test_withMultipleCircles(self):
value = "Fahrzeug (A, B), Reisen (A), KMU (B)"
self.assertEqual(
["Fahrzeug (A, B)", "Reisen (A)", "KMU (B)"],
parse_circle_group_string(value),
)

View File

View File

@ -0,0 +1,110 @@
import datetime
import re
from typing import Any, List, Optional, Tuple, Union
from dateutil.parser import parse
from six import string_types
def parse_formats(dt_str, fmt_strs, **parser_kwargs):
for fmt in fmt_strs:
try:
return datetime.datetime.strptime(dt_str, fmt)
except ValueError:
pass
return parse(dt_str, **parser_kwargs)
def try_parse_int(x: Any, default: Optional[Any] = None) -> Tuple[bool, Any]:
try:
return True, int(x)
# pylint: disable=broad-except
except Exception:
if default is None:
return False, x
return False, default
def try_parse_date(
value: Union[str, datetime.date]
) -> Tuple[bool, Union[str, datetime.date]]:
if isinstance(value, datetime.date):
return True, value
elif isinstance(value, datetime.datetime):
return True, value.date()
elif isinstance(value, string_types):
if value.strip().replace(".", "", 1).isdigit():
return False, value
# date needs at least 3 parts
if len(re.split(r"[.-]", value)) < 3:
return False, value
try:
date_with_time = parse_formats(
value,
[
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d",
"%d.%m.%Y",
],
dayfirst=True,
)
return True, date_with_time.date()
except ValueError:
return False, value
else:
return False, value
def try_parse_datetime(
value: Union[str, datetime.datetime]
) -> Tuple[bool, Union[str, datetime.datetime]]:
if isinstance(value, datetime.datetime):
return True, value
elif isinstance(value, string_types):
if value.strip().replace(".", "", 1).isdigit():
return False, value
# date needs at least 3 parts
if len(re.split(r"[.-]", value)) < 3:
return False, value
try:
date_with_time = parse_formats(
value,
[
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%d.%m.%Y, %H:%M",
],
dayfirst=True,
)
return True, date_with_time
except ValueError:
return False, value
else:
return False, value
def parse_circle_group_string(value: str) -> List[str]:
# This regex pattern matches any comma that is not inside parentheses
pattern = r",(?![^()]*\))"
# re.split() splits the string based on the pattern
return [x.strip() for x in re.split(pattern, value)]
def calc_header_tuple_list_from_pyxl_sheet(sheet):
header = [cell.value for cell in sheet[1]]
result = []
for row in sheet.iter_rows(min_row=2, values_only=True):
if all(cell_value is None for cell_value in row):
continue
result.append(list(zip(header, row)))
return result

View File

View File

@ -1,6 +1,6 @@
import logging
from typing import Optional
import structlog
from notifications.signals import notify
from sendgrid import Mail, SendGridAPIClient
from storages.utils import setting
@ -8,7 +8,7 @@ from storages.utils import setting
from vbv_lernwelt.core.models import User
from vbv_lernwelt.notify.models import Notification, NotificationType
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
class EmailService:
@ -25,10 +25,14 @@ class EmailService:
)
try:
cls._sendgrid_client.send(message)
logger.info(f"Successfully sent email to {recipient}")
logger.info(f"Successfully sent email to {recipient}", label="email")
return True
except Exception as e:
logger.error(f"Failed to send email to {recipient}: {e}")
logger.error(
f"Failed to send email to {recipient}: {e}",
exc_info=True,
label="email",
)
return False

View File

@ -1,8 +1,9 @@
import base64
import json
import logging
logger = logging.getLogger(__name__)
import structlog
logger = structlog.get_logger(__name__)
def decode_jwt(jwt: str):
@ -11,7 +12,9 @@ def decode_jwt(jwt: str):
payload_bytes = base64.urlsafe_b64decode(_correct_padding(jwt_parts[1]))
payload = json.loads(payload_bytes.decode("UTF-8"))
except Exception as e:
logger.warning(f"OAuthToken error: Could not decode jwt: {e}")
logger.warning(
f"OAuthToken error: Could not decode jwt: {e}", exc_info=True, label="sso"
)
return None
return payload

View File

@ -1,3 +0,0 @@
from django.test import TestCase
# Create your tests here.

View File

@ -0,0 +1,17 @@
{
"ver": "1.0",
"iss": "https://vbvtst.b2clogin.com/6967b19e-ec5c-4a46-bb16-01b0983da41b/v2.0/",
"sub": "f8c8e526-9fb1-4983-a5b7-4c069a83e317",
"aud": "8d32c131-0d60-4588-b01a-ae3435d44c23",
"exp": 1685538794,
"nonce": "mABq9hjYOMF34fCEi3VL",
"iat": 1685535194,
"auth_time": 1685535194,
"oid": "f8c8e526-9fb1-4983-a5b7-4c069a83e317",
"given_name": "Daniel",
"family_name": "Egger",
"name": "unknown",
"emails": ["daniel.egger+vbv-stage@gmail.com"],
"tfp": "B2C_1_SignUpAndSignIn_v3",
"nbf": 1685535194
}

View File

@ -1,10 +1,11 @@
import structlog as structlog
from authlib.integrations.base_client import OAuthError
from django.conf import settings
from django.contrib.auth import get_user_model, login as dj_login
from django.contrib.auth import login as dj_login
from django.shortcuts import redirect
from sentry_sdk import capture_exception
from vbv_lernwelt.importer.services import create_or_update_user
from vbv_lernwelt.sso.client import oauth
from vbv_lernwelt.sso.jwt import decode_jwt
@ -22,19 +23,25 @@ def login(request):
def authorize(request):
try:
logger.debug(request)
logger.debug(request, label="sso")
token = getattr(oauth, settings.OAUTH["client_name"]).authorize_access_token(
request
)
deocded_token = decode_jwt(token["id_token"])
decoded_token = decode_jwt(token["id_token"])
# logger.debug(label="sso", decoded_token=decoded_token)
except OAuthError as e:
logger.error(f"OAuth error: {e}")
logger.error(e, exc_info=True, label="sso")
if not settings.DEBUG:
capture_exception(e)
return redirect(f"/{OAUTH_FAIL_REDIRECT}?state=someerror") # to be defined
user_data = _user_data_from_token_data(deocded_token)
user, created = get_user_model().objects.create_or_update_by_email(user_data)
user_data = _user_data_from_token_data(decoded_token)
user = create_or_update_user(
email=user_data.get("email"),
sso_id=user_data.get("sso_id"),
first_name=user_data.get("first_name", ""),
last_name=user_data.get("last_name", ""),
)
dj_login(request, user)
return redirect(f"/")
@ -45,7 +52,6 @@ def _user_data_from_token_data(token: dict) -> dict:
return {
"first_name": token.get("given_name", ""),
"last_name": token.get("family_name", ""),
"username": token.get("preferred_username", first_email),
"email": first_email,
"oid": token.get("oid"),
"sso_id": token.get("oid"),
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB