skillbox/server/books/management/commands/migrate_objectives_to_conte...

306 lines
14 KiB
Python

import json
from logging import getLogger
from django.core.management import BaseCommand
from django.core.exceptions import ValidationError
from django.db import models
from books.models import Chapter, ObjectiveGroupSnapshot, Snapshot, ContentBlockSnapshot, ChapterSnapshot
from books.models import ContentBlock
from books.models import Module
from objectives.models import ObjectiveSnapshot, Objective, ObjectiveGroup
logger = getLogger(__name__)
class Command(BaseCommand):
def handle(self, *args, **options):
migrate_objectives_to_content()
def migrate_objectives_to_content():
created_content_blocks = 0
failed_modules = []
# This dict stores all content blocks that have been created for a set of objectives
# In this way we can reuse content blocks for the same set of objectives
created_default_content_blocks = {}
for module in Module.objects.all():
try:
chapter = create_chapter_from_objective_group(module)
for objective_group in get_objectives_groups_in_specific_order(module):
default_objectives = list(objective_group.objectives.filter(owner__isnull=True, )
.exclude(objectivesnapshot__isnull=False).order_by('order'))
default_objectives_ids = tuple(objective.id for objective in default_objectives)
# Create "Verlagsinhalte" content block if it does not exist yet
if default_objectives_ids not in created_default_content_blocks:
default_content_block = create_default_content(objective_group, chapter)
created_default_content_blocks[default_objectives_ids] = default_content_block
else:
default_content_block = created_default_content_blocks[default_objectives_ids]
# set visibility for objective_group for default content (Verlagsinhalte)
if objective_group.hidden_for.exists():
default_content_block.hidden_for.add(*objective_group.hidden_for.all())
default_content_block.save_revision().publish()
default_content_block.save()
custom_objectives_by_owner = get_objectives_by_owner(objective_group)
if default_objectives or custom_objectives_by_owner:
contentblocks_by_merged_objectives_ids = {}
# for custom objectives iterate over owners,
# - one ownsers custom objectives must not be changed by another owner
# - visibility is set per class
for owner, owner_objectives in custom_objectives_by_owner.items():
print(f"Owner: {owner}")
print(f" Objectives: ")
visible_default_objectives_by_class = filter_visible_objectives_by_class(default_objectives,
owner)
for school_class, default_objectives_for_class in visible_default_objectives_by_class.items():
custom_content_block = None
print(f" School class: {school_class}")
# merge "Verlagsinhalte" and "benutzerdefinierte Inhalte"
visible_owner_objectives = [objective for objective in owner_objectives if
not objective.is_hidden_for_class(school_class)]
merged_objectives = default_objectives_for_class + visible_owner_objectives
merged_objectives_ids = tuple(objective.id for objective in merged_objectives)
is_default_content = merged_objectives_ids == default_objectives_ids
if is_default_content:
print(f" Objective: Reuse default content block")
# custom_content_block = default_content_block
# Create content block if that set of objectives has not been created yet
if merged_objectives_ids not in contentblocks_by_merged_objectives_ids and not is_default_content:
for objective in merged_objectives:
print(f" Objective: {objective} {objective.owner}")
if merged_objectives_ids:
custom_content_block = create_content_block_from_objective(objective_group,
chapter,
owner=owner,
)
contentblocks_by_merged_objectives_ids[
merged_objectives_ids] = custom_content_block
create_text_in_content_block(merged_objectives, custom_content_block)
created_content_blocks += 1
else:
if not is_default_content:
print(f" Objective: Reuse content block")
custom_content_block = contentblocks_by_merged_objectives_ids[
merged_objectives_ids]
else:
print(f" Objective: Reuse default content block")
# set visibility
# hide default objectives if custom objectives exist
if custom_content_block:
default_content_block.hidden_for.add(school_class)
default_content_block.save_revision().publish()
default_content_block.save()
# make custom content block visible for school class if it is not in hidden list
if not objective_group.hidden_for.filter(id=school_class.id).exists():
custom_content_block.visible_for.add(school_class)
else:
custom_content_block.hidden_for.add(school_class)
custom_content_block.save_revision().publish()
custom_content_block.save()
except ValidationError as e:
print(f"Error with module {module}")
logger.error(e)
failed_modules.append(module)
print(f"Created {created_content_blocks} content blocks")
print(f"Failed modules: {len(failed_modules)}")
for module in failed_modules:
print(f"Faile module: {module}")
def get_objectives_groups_in_specific_order(module):
# Create a specific order for the objective groups
# https://stackoverflow.com/questions/5966462/sort-queryset-by-values-in-list
# https://docs.djangoproject.com/en/5.0/ref/models/conditional-expressions/
order_of_objective_groups = ["language_communication", "society", "interdisciplinary"]
_whens = []
for sort_index, value in enumerate(order_of_objective_groups):
_whens.append(
models.When(title=value, then=sort_index)
)
qs = module.objective_groups.all().annotate(_sort_index=models.Case(*_whens,
output_field=models.IntegerField()
)
).order_by('_sort_index')
return qs
def create_default_content(objective_group, chapter):
"""Create Verlagsinhalt Lernziele"""
print(f" Objective group: {objective_group}")
print(" Default objectives:")
default_objectives = list(
objective_group.objectives.filter(owner__isnull=True, objectivesnapshot__isnull=True).order_by('id'))
default_content_block = create_content_block_from_objective(objective_group, chapter)
create_text_in_content_block(default_objectives, default_content_block)
for objective in default_objectives:
print(f" Objective: {objective} {objective.owner}")
return default_content_block
def filter_visible_objectives_by_class(objectives, user):
school_classes = user.school_classes.all()
visible_objectives = {}
for school_class in school_classes:
if school_class not in visible_objectives:
visible_objectives[school_class] = []
objectives_vis = [objective for objective in objectives if not objective.is_hidden_for_class(school_class)]
visible_objectives[school_class].extend(objectives_vis)
return visible_objectives
def get_objectives_by_owner(objective_group, exclude_snapshots=True):
custom_objectives = objective_group.objectives.filter(owner__isnull=False, objectivesnapshot__isnull=True).order_by(
'order')
custom_objectives_by_owner = {}
for objective in custom_objectives:
owner = objective.owner
if owner not in custom_objectives_by_owner:
custom_objectives_by_owner[owner] = []
custom_objectives_by_owner[owner].append(objective)
# add owners with hidden default objectives to custom objectives, needed for further processing
hidden_default_objectives = list(
objective_group.objectives.filter(owner__isnull=True, objectivesnapshot__isnull=True, hidden_for__isnull=False))
for hidden_default_objective in hidden_default_objectives:
for school_class in hidden_default_objective.hidden_for.all():
for teacher in school_class.get_teachers():
if teacher not in custom_objectives_by_owner:
custom_objectives_by_owner[teacher] = []
return custom_objectives_by_owner
def create_chapter_from_objective_group(module, prefix="XXX "):
chapter = Chapter(title=f"Lernziele")
first_sibling = module.get_first_child()
if first_sibling is not None:
first_sibling.add_sibling(instance=chapter, pos='left')
chapter.save_revision().publish()
chapter.save()
return chapter
def create_chapter_snapshot_from_objective_group(module, snapshot, prefix="XXX "):
chapter = Chapter.objects.filter(parent=module, title=f"Lernziele").first()
chapter_snapshot = ChapterSnapshot(title=f"Lernziele", snapshot=snapshot)
first_sibling = module.get_first_child()
if first_sibling is not None:
first_sibling.add_sibling(instance=chapter_snapshot, pos='left')
return chapter
def create_content_block_from_objective(objective_group, chapter, owner=None):
content_block = ContentBlock(
title=f"{objective_group.get_title_display()}",
type="normal",
owner=owner,
user_created=owner is not None,
original_creator=owner
)
chapter.add_child(instance=content_block)
return content_block
def create_content_block_snapshot_from_objective(objective_group, chapter, snapshot, owner=None):
content_block_snapshot = ContentBlockSnapshot(
title=f"{objective_group.get_title_display()}",
type="normal",
owner=owner,
user_created=owner is not None,
snapshot=snapshot
)
chapter.add_child(instance=content_block_snapshot)
return content_block_snapshot
def create_text_in_content_block(objectives, content_block, get_or_create=False):
objectives = list(objectives)
content_block.contents = create_content_block_contents(objectives)
if get_or_create:
content_block_qs = ContentBlock.objects.filter(title=content_block.title,
owner=content_block.owner,
user_created=content_block.user_created,
contents=content_block.contents)
if content_block_qs.exists():
content_block = content_block_qs.first()
content_block.save_revision().publish()
return content_block
def create_content_block_contents(objectives):
objectives = list(objectives)
objective_li = [f"<li>{objective.text}</li>" for objective in objectives if objective.text]
texts = [{'type': 'text_block',
'value': {
'text': f"<ul>{''.join(str(i) for i in objective_li)}</ul>"
}}]
contents = json.dumps(texts)
return contents
def analyze():
print(f"""
OjectiveGroups: {ObjectiveGroup.objects.count()}
Objectives: {Objective.objects.count()}
ObjectiveGroupSnapshots: {ObjectiveGroupSnapshot.objects.count()}
ObjectivesSnapshots: {ObjectiveSnapshot.objects.filter(hidden=True).count()}
ObjectiveGroups: {ObjectiveGroup.objects.filter(objectivegroupsnapshot__isnull=True).count()}
Objectives: {Objective.objects.filter(objectivesnapshot__isnull=True).count()}
Snapshot: {Snapshot.objects.count()}
""")
def clean_snapshots():
""" utility function to clean up snapshots """
emails = ["mia.teacher", "simone.gerber", "pascal.sigg", "dario.aebersold", "steph-teacher"]
for snapshot in Snapshot.objects.all():
for email in emails:
if email in snapshot.creator.email:
print(f"Deleting snapshot of user {email}")
snapshot.delete()