vbv/server/vbv_lernwelt/shop/views.py

305 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import structlog
from django.conf import settings
from django.http import JsonResponse
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from sentry_sdk import capture_exception
from vbv_lernwelt.core.models import Country, User
from vbv_lernwelt.course.consts import (
COURSE_VERSICHERUNGSVERMITTLERIN_FR_ID,
COURSE_VERSICHERUNGSVERMITTLERIN_ID,
COURSE_VERSICHERUNGSVERMITTLERIN_IT_ID,
)
from vbv_lernwelt.course.models import CourseSession, CourseSessionUser
from vbv_lernwelt.notify.email.email_services import EmailTemplate, send_email
from vbv_lernwelt.shop.const import (
VV_DE_PRODUCT_SKU,
VV_FR_PRODUCT_SKU,
VV_IT_PRODUCT_SKU,
)
from vbv_lernwelt.shop.models import (
BillingAddress,
CheckoutInformation,
CheckoutState,
Product,
)
from vbv_lernwelt.shop.serializers import BillingAddressSerializer
from vbv_lernwelt.shop.services import (
datatrans_state_to_checkout_state,
get_payment_url,
init_transaction,
InitTransactionException,
is_signature_valid,
)
logger = structlog.get_logger(__name__)
PRODUCT_SKU_TO_COURSE = {
VV_DE_PRODUCT_SKU: COURSE_VERSICHERUNGSVERMITTLERIN_ID,
VV_FR_PRODUCT_SKU: COURSE_VERSICHERUNGSVERMITTLERIN_FR_ID,
VV_IT_PRODUCT_SKU: COURSE_VERSICHERUNGSVERMITTLERIN_IT_ID,
}
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def get_billing_address(request):
try:
billing_address = BillingAddress.objects.get(user=request.user)
data = BillingAddressSerializer(billing_address).data
except BillingAddress.DoesNotExist:
data = BillingAddressSerializer().data
data["first_name"] = request.user.first_name # noqa
data["last_name"] = request.user.last_name # noqa
return Response(data)
@api_view(["PUT"])
@permission_classes([IsAuthenticated])
def update_billing_address(request):
try:
billing_address = BillingAddress.objects.get(user=request.user)
except BillingAddress.DoesNotExist:
billing_address = None
serializer = BillingAddressSerializer(
billing_address, data=request.data, partial=True
)
if serializer.is_valid():
serializer.save(user=request.user)
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(["POST"])
def transaction_webhook(request):
"""IMPORTANT: This is not called for timed out transactions!"""
logger.info("Webhook: Datatrans called transaction webhook", body=request.body)
if not is_signature_valid(
signature=request.headers.get("Datatrans-Signature", ""),
payload=request.body,
):
logger.warning("Datatrans Transaction Webhook: Invalid Signature -> Ignored")
return JsonResponse({"status": "invalid signature"}, status=400)
transaction = request.data
transaction_id = transaction["transactionId"]
# keep webhook history (for debugging)
checkout_info = CheckoutInformation.objects.get(transaction_id=transaction_id)
checkout_info.webhook_history.append(transaction)
checkout_info.save(update_fields=["webhook_history"])
# update checkout state
checkout_state = datatrans_state_to_checkout_state(transaction["status"])
update_checkout_state(checkout_info=checkout_info, state=checkout_state)
# handle paid
if checkout_state == CheckoutState.PAID:
create_vv_course_session_user(checkout_info=checkout_info)
return JsonResponse({"status": "ok"})
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def checkout_vv(request):
"""
Check-out for the Versicherungsvermittler products (vv-de, vv-fr, vv-it)
IMPORTANT: Even if we have an already ONGOING checkout,
we create a new one! This might seem a bit unintuitive,
but it's the advised way to handle it by Datatrans:
"Fehlverhalten des User können fast gar nicht abgefangen werden,
wichtig wäre aus eurer Sicht das ihr immer einen neuen INIT
schickt, wenn der User im Checkout ist und zum Beispiel
auf «Bezahlen» klickt. Um zum Beispiel White-screens
bei Browser Back redirections zu vermeiden."
"""
sku = request.data["product"]
base_redirect_url = request.data["redirect_url"]
logger.info(f"Checkout requested: sku={sku}", user_id=request.user.id)
try:
product = Product.objects.get(sku=sku)
except Product.DoesNotExist:
return next_step_response(
url=checkout_error_url(
base_url=base_redirect_url,
product_sku=sku,
message=f"{sku}_product_sku_does_not_exist_needs_to_be_created_first",
),
)
checkouts = CheckoutInformation.objects.filter(
user=request.user,
product_sku=sku,
)
# already paid successfully -> redirect to home
# any other case create a new checkout (see doc above)
if checkouts.filter(state=CheckoutState.PAID).exists():
return next_step_response(url="/")
try:
transaction_id = init_transaction(
user=request.user,
amount_chf_centimes=product.price,
redirect_url_success=checkout_success_url(
base_url=base_redirect_url, product_sku=sku
),
redirect_url_error=checkout_error_url(
base_url=base_redirect_url, product_sku=sku
),
redirect_url_cancel=checkout_cancel_url(base_redirect_url),
webhook_url=webhook_url(base_redirect_url),
)
except InitTransactionException as e:
if not settings.DEBUG:
capture_exception(e)
return next_step_response(
url=checkout_error_url(
base_url=base_redirect_url,
product_sku=sku,
),
)
checkout_info = CheckoutInformation.objects.create(
user=request.user,
state=CheckoutState.ONGOING,
transaction_id=transaction_id,
# product
product_sku=sku,
product_price=product.price,
product_name=product.name,
product_description=product.description,
# address
**request.data["address"],
)
update_user_address(user=request.user, checkout_info=checkout_info)
return next_step_response(url=get_payment_url(transaction_id))
def update_checkout_state(checkout_info: CheckoutInformation, state: CheckoutState):
checkout_info.state = state.value
checkout_info.save(update_fields=["state"])
def send_vv_welcome_email(checkout_info: CheckoutInformation):
course_names = {
VV_DE_PRODUCT_SKU: "Versicherungsvermittler/-in VBV (Deutsch)",
VV_FR_PRODUCT_SKU: "Intermédiaire dassurance AFA (Français)",
VV_IT_PRODUCT_SKU: "Intermediario/a assicurativo/a AFA (Italiano)",
}
send_email(
recipient_email=checkout_info.user.email,
template=EmailTemplate.WELCOME_MAIL_VV,
template_data={
"course": course_names[checkout_info.product_sku],
"target_url": "https://my.vbv-afa.ch/",
"name": f"{checkout_info.first_name} {checkout_info.last_name}",
"private_street": f"{checkout_info.street} {checkout_info.street_number}",
"private_city": f"{checkout_info.postal_code} {checkout_info.city} {checkout_info.country}",
"company_name": checkout_info.company_name,
"company_street": f"{checkout_info.company_street} {checkout_info.company_street_number}",
"company_city": f"{checkout_info.company_postal_code} {checkout_info.company_city} {checkout_info.company_country}",
},
template_language=checkout_info.user.language,
fail_silently=True,
)
def create_vv_course_session_user(checkout_info: CheckoutInformation):
logger.info("Creating VV course session user", user_id=checkout_info.user_id)
_, created = CourseSessionUser.objects.get_or_create(
user=checkout_info.user,
role=CourseSessionUser.Role.MEMBER,
course_session=CourseSession.objects.filter(
course_id=PRODUCT_SKU_TO_COURSE[checkout_info.product_sku]
).first(),
)
if created:
logger.info("VV course session user created", user_id=checkout_info.user_id)
send_vv_welcome_email(checkout_info)
def next_step_response(
url: str,
) -> JsonResponse:
return JsonResponse(
{
"next_step_url": url,
},
)
def webhook_url(base_url: str) -> str:
return f"{base_url}/api/shop/transaction/webhook/"
def checkout_error_url(
base_url: str, product_sku: str, message: str | None = None
) -> str:
url = f"{base_url}/onboarding/{product_sku}/checkout/address?error"
if message:
url += f"&message={message}"
return url
def checkout_cancel_url(base_url: str) -> str:
return f"{base_url}/"
def checkout_success_url(product_sku: str, base_url: str = "") -> str:
return f"{base_url}/onboarding/{product_sku}/checkout/complete"
def update_user_address(user: User, checkout_info: CheckoutInformation):
user.street = checkout_info.street
user.street_number = checkout_info.street_number
user.postal_code = checkout_info.postal_code
user.city = checkout_info.city
if checkout_info.country:
user.country = Country.objects.filter(country_id=checkout_info.country).first()
if (
checkout_info.company_name
and checkout_info.company_street
and checkout_info.company_street_number
and checkout_info.company_postal_code
and checkout_info.company_city
and checkout_info.company_country
):
user.organisation_detail_name = checkout_info.company_name
user.organisation_street = checkout_info.company_street
user.organisation_street_number = checkout_info.company_street_number
user.organisation_postal_code = checkout_info.company_postal_code
user.organisation_city = checkout_info.company_city
user.organisation_country = Country.objects.filter(
country_id=checkout_info.company_country
).first()
user.invoice_address = User.INVOICE_ADDRESS_ORGANISATION
user.save()