# mysite/api/graphene_wagtail.py # Taken from https://github.com/patrick91/wagtail-ql/blob/master/backend/graphene_utils/converter.py and slightly adjusted import logging from assignments.models import Assignment from basicknowledge.models import BasicKnowledge from books.models import CustomDocument from graphene.types import Scalar from graphene_django.converter import convert_django_field from graphql_relay import to_global_id from surveys.models import Survey from wagtail.documents.models import Document from wagtail.fields import StreamField from wagtail.images.models import Image logger = logging.getLogger(__name__) # todo: replace this with the newer method from https://wagtail.org/blog/graphql-with-streamfield/ class GenericStreamFieldType(Scalar): @staticmethod def serialize(stream_value): raw_data = stream_value.raw_data return list(augment_fields(raw_data)) def get_document_json(document_id): try: document = CustomDocument.objects.get(id=document_id) value = { "value": document_id, "id": document.id, "file_name": document.filename, "file_extension": document.file_extension, "url": document.url, "title": document.title, "display_text": document.display_text, } return value except CustomDocument.DoesNotExist: logger.error("CustomDocument {} does not exist".format(document_id)) return None def augment_fields(raw_data): for data in raw_data: if isinstance(data, dict): _type = data["type"] if _type == "image_block": _value = data["value"] value = { # 'value': _value, # 'id': d['id'], "path": Image.objects.get(id=_value).file.url } data["value"] = value if _type == "assignment": _value = data["value"] assignment_id = _value["assignment_id"] try: assignment = Assignment.objects.get(pk=assignment_id) value = { "title": assignment.title, "assignment": assignment.assignment, "solution": assignment.solution, "id": to_global_id("AssignmentNode", assignment.pk), } data["value"] = value except Assignment.DoesNotExist: logger.error("Assignment {} does not exist".format(assignment_id)) if _type == "survey": _value = data["value"] survey_id = _value["survey_id"] try: survey = Survey.objects.get(pk=survey_id) value = { "title": survey.title, "id": to_global_id("SurveyNode", survey.pk), } data["value"] = value except Survey.DoesNotExist: logger.error("Survey {} does not exist".format(survey_id)) if _type == "basic_knowledge" or _type == "instrument": _value = data["value"] instrument = BasicKnowledge.objects.get(pk=_value["basic_knowledge"]) _value.update( { "slug": instrument.slug, "foreground": instrument.new_type.category.foreground, } ) data["value"] = _value # value = dict(d['value']) # if 'document' in value: # value['document'] = Document.objects.get(id=value['document']).file.url # if 'image' in value: # value['image'] = Image.objects.get(id=value['image']).file.url # else: # _type = d[0] # value = dict(d[1]) # if 'document' in value: # value['document'] = value['document'].file.url # if 'image' in value: # value['image'] = value['image'].file.url if _type == "content_list_item": item_data = data["value"] data["value"] = augment_fields(item_data) if _type == "cms_document_block": _value = data["value"] value = get_document_json(_value) if value is not None: data["value"] = value if _type == "solution" or _type == "instruction": _value = data["value"] document_id = _value.get("document") if document_id is not None: document = get_document_json(document_id) if document is not None: _value["document"] = document data["value"] = _value return raw_data @convert_django_field.register(StreamField) def convert_stream_field(field, registry=None): return GenericStreamFieldType(description=field.help_text, required=not field.null)