150 lines
5.5 KiB
Python
150 lines
5.5 KiB
Python
# mysite/api/graphene_wagtail.py
|
|
# Taken from https://github.com/patrick91/wagtail-ql/blob/master/backend/graphene_utils/converter.py and slightly adjusted
|
|
import logging
|
|
|
|
from graphene.types import Scalar
|
|
from graphene_django.converter import convert_django_field
|
|
from graphql_relay import to_global_id
|
|
from wagtail.fields import StreamField
|
|
from wagtail.images.models import Image
|
|
|
|
from assignments.models import Assignment
|
|
from basicknowledge.models import BasicKnowledge
|
|
from books.models import CustomDocument
|
|
from surveys.models import Survey
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# todo: replace this with the newer method from https://wagtail.org/blog/graphql-with-streamfield/
|
|
class GenericStreamFieldType(Scalar):
|
|
@staticmethod
|
|
def serialize(stream_value):
|
|
raw_data = stream_value.raw_data
|
|
return list(augment_fields(raw_data))
|
|
|
|
|
|
def get_document_json(document_id):
|
|
try:
|
|
document = CustomDocument.objects.get(id=document_id)
|
|
value = {
|
|
"value": document_id,
|
|
"id": document.id,
|
|
"file_name": document.filename,
|
|
"file_extension": document.file_extension,
|
|
"url": document.url,
|
|
"title": document.title,
|
|
"display_text": document.display_text,
|
|
}
|
|
return value
|
|
except CustomDocument.DoesNotExist:
|
|
logger.error("CustomDocument {} does not exist".format(document_id))
|
|
return None
|
|
|
|
|
|
def get_wagtail_image_dict(image_id: int) -> dict | None:
|
|
from books.schema.nodes import get_srcset, get_src
|
|
|
|
try:
|
|
image = Image.objects.get(id=image_id)
|
|
value = {
|
|
'value': image_id,
|
|
'id': image.id,
|
|
'src': get_src(image),
|
|
'alt': image.title,
|
|
'title': image.title,
|
|
'width': image.width,
|
|
'height': image.height,
|
|
'srcset': get_srcset(image),
|
|
}
|
|
return value
|
|
except Image.DoesNotExist:
|
|
logger.error('Image {} does not exist'.format(image_id))
|
|
return None
|
|
|
|
|
|
|
|
def augment_fields(raw_data):
|
|
for data in raw_data:
|
|
if isinstance(data, dict):
|
|
_type = data['type']
|
|
if _type == 'image_block':
|
|
_value = data['value']
|
|
value = get_wagtail_image_dict(_value)
|
|
data['value'] = value
|
|
if _type == 'assignment':
|
|
_value = data['value']
|
|
assignment_id = _value['assignment_id']
|
|
try:
|
|
assignment = Assignment.objects.get(pk=assignment_id)
|
|
value = {
|
|
"title": assignment.title,
|
|
"assignment": assignment.assignment,
|
|
"solution": assignment.solution,
|
|
"id": to_global_id("AssignmentNode", assignment.pk),
|
|
}
|
|
data["value"] = value
|
|
except Assignment.DoesNotExist:
|
|
logger.error("Assignment {} does not exist".format(assignment_id))
|
|
if _type == "survey":
|
|
_value = data["value"]
|
|
survey_id = _value["survey_id"]
|
|
try:
|
|
survey = Survey.objects.get(pk=survey_id)
|
|
value = {
|
|
"title": survey.title,
|
|
"id": to_global_id("SurveyNode", survey.pk),
|
|
}
|
|
data["value"] = value
|
|
except Survey.DoesNotExist:
|
|
logger.error("Survey {} does not exist".format(survey_id))
|
|
if _type == "basic_knowledge" or _type == "instrument":
|
|
_value = data["value"]
|
|
instrument = BasicKnowledge.objects.get(pk=_value["basic_knowledge"])
|
|
_value.update(
|
|
{
|
|
"slug": instrument.slug,
|
|
"foreground": instrument.new_type.category.foreground,
|
|
}
|
|
)
|
|
data["value"] = _value
|
|
|
|
# value = dict(d['value'])
|
|
# if 'document' in value:
|
|
# value['document'] = Document.objects.get(id=value['document']).file.url
|
|
# if 'image' in value:
|
|
# value['image'] = Image.objects.get(id=value['image']).file.url
|
|
|
|
# else:
|
|
# _type = d[0]
|
|
# value = dict(d[1])
|
|
# if 'document' in value:
|
|
# value['document'] = value['document'].file.url
|
|
# if 'image' in value:
|
|
# value['image'] = value['image'].file.url
|
|
|
|
if _type == "content_list_item":
|
|
item_data = data["value"]
|
|
data["value"] = augment_fields(item_data)
|
|
|
|
if _type == "cms_document_block":
|
|
_value = data["value"]
|
|
value = get_document_json(_value)
|
|
if value is not None:
|
|
data["value"] = value
|
|
if _type == "solution" or _type == "instruction":
|
|
_value = data["value"]
|
|
document_id = _value.get("document")
|
|
if document_id is not None:
|
|
document = get_document_json(document_id)
|
|
if document is not None:
|
|
_value["document"] = document
|
|
data["value"] = _value
|
|
|
|
return raw_data
|
|
|
|
|
|
@convert_django_field.register(StreamField)
|
|
def convert_stream_field(field, registry=None):
|
|
return GenericStreamFieldType(description=field.help_text, required=not field.null)
|