chore: validate signature & cleanup

This commit is contained in:
Livio Bieri 2023-11-17 12:40:30 +01:00 committed by Christian Cueni
parent c6cb6ba80c
commit b1439122e1
5 changed files with 200 additions and 248 deletions

View File

@ -648,6 +648,26 @@ NOTIFICATIONS_NOTIFICATION_MODEL = "notify.Notification"
# sendgrid (email notifications)
SENDGRID_API_KEY = env("IT_SENDGRID_API_KEY", default="")
# Datatrans (payment)
# See https://admin.sandbox.datatrans.com/MerchSecurAdmin.jsp)
DATATRANS_HMAC_KEY = env("DATATRANS_HMAC_KEY")
# See https://admin.sandbox.datatrans.com/MenuDispatch.jsp?main=1&sub=4)
# => echo -n "Username:Password" | base64
DATATRANS_BASIC_AUTH_KEY = env("DATATRANS_BASIC_AUTH_KEY")
if APP_ENVIRONMENT.startswith("prod"):
DATATRANS_API_ENDPOINT = "https://api.datatrans.com"
DATATRANS_PAY_URL = "https://pay.datatrans.com"
else:
DATATRANS_API_ENDPOINT = "https://api.sandbox.datatrans.com"
DATATRANS_PAY_URL = "https://pay.sandbox.datatrans.com"
# Only for debugging the webhook (locally)
DATATRANS_DEBUG_WEBHOOK_OVERWRITE = env(
"DATATRANS_DEBUG_WEBHOOK_OVERWRITE", default=None
)
# S3 BUCKET CONFIGURATION
FILE_UPLOAD_STORAGE = env("FILE_UPLOAD_STORAGE", default="s3") # local | s3

View File

@ -1,7 +1,7 @@
from django.contrib import admin
from vbv_lernwelt.shop.models import CheckoutInformation, Country, Product
from vbv_lernwelt.shop.services import DataTransPaymentProvider
from vbv_lernwelt.shop.services import get_transaction_state
@admin.action(description="ABACUS: Create invoices")
@ -12,9 +12,7 @@ def generate_invoice(modeladmin, request, queryset):
@admin.action(description="DATATRANS: Sync transaction states")
def sync_transaction_state(modeladmin, request, queryset):
for checkout in queryset:
state = DataTransPaymentProvider().get_transaction_state(
transaction_id=checkout.transaction_id
)
state = get_transaction_state(transaction_id=checkout.transaction_id)
print(state)
checkout.state = state.value
checkout.save(

View File

@ -1,11 +1,10 @@
import hashlib
import hmac
import os
import uuid
from abc import ABC, abstractmethod
import requests
import structlog
from django.conf import settings
from vbv_lernwelt.core.admin import User
from vbv_lernwelt.shop.models import CheckoutState
@ -17,210 +16,117 @@ class PaymentException(Exception):
pass
class PaymentProvider(ABC):
@abstractmethod
def init_transaction(
self,
user: User,
amount_chf_centimes: int,
redirect_url_success: str,
redirect_url_error: str,
redirect_url_cancel: str,
webhook_url: str,
):
raise NotImplementedError()
@abstractmethod
def get_transaction_state(
self,
transaction_id: str,
) -> CheckoutState | None:
"""
Get the state of a transaction.
:param transaction_id: Transaction ID
:return: Transaction state or None if transaction does not exist
"""
raise NotImplementedError()
@abstractmethod
def get_payment_url(self, transaction_id: str):
"""
Get the URL to redirect to for payment.
:param transaction_id: Transaction ID
:return: URL to redirect to for payment
"""
raise NotImplementedError()
class DataTransPaymentProvider(PaymentProvider):
def is_signature_valid(
signature: str,
payload: bytes,
hmac_key: str = settings.DATATRANS_HMAC_KEY,
):
"""
Configuration:
- 1. USE_DATATRANS_PRODUCTION, e.g. "False" for sandbox, "True" for production. Default: False
- 2. DATATRANS_BASIC_AUTH_KEY (see https://admin.sandbox.datatrans.com/MenuDispatch.jsp?main=1&sub=4)
- echo -n "Username:Password" | base64
- 3. DATATRANS_HMAC_KEY (see https://admin.sandbox.datatrans.com/MerchSecurAdmin.jsp)
- 4. DATATRANS_DEBUG_WEBHOOK_OVERWRITE (optional webhook URL overwrite for debugging purposes)
See the docs:
https://docs.datatrans.ch/docs/additional-security
"""
def __init__(self):
is_production_environment = (
os.getenv("USE_DATATRANS_PRODUCTION", "False").lower() == "true"
try:
timestamp = signature.split(",")[0].split("=")[1]
s0_expected = signature.split(",")[1].split("=")[1]
key_hex_bytes = bytes.fromhex(hmac_key)
timestamp_bytes = bytes(timestamp, "utf-8")
s0_actual = hmac.new(
key_hex_bytes, timestamp_bytes + payload, hashlib.sha256
).hexdigest()
except (IndexError, ValueError):
logger.warning(
"Invalid signature format Expected format: t=TIMESTAMP,s0=XXXX",
signature=signature,
)
return False
self._api_endpoint = (
"https://api.sandbox.datatrans.com"
if not is_production_environment
else "https://api.datatrans.com"
return s0_actual == s0_expected
def init_transaction(
user: User,
amount_chf_centimes: int,
redirect_url_success: str,
redirect_url_error: str,
redirect_url_cancel: str,
webhook_url: str,
):
if overwrite := settings.DATATRANS_DEBUG_WEBHOOK_OVERWRITE:
logger.warning(
"APPLYING DEBUG DATATRANS WEBHOOK OVERWRITE!",
webhook_url=overwrite,
)
webhook_url = overwrite
self._pay_endpoint = (
"https://pay.sandbox.datatrans.com"
if not is_production_environment
else "https://pay.datatrans.com"
)
payload = {
# See https://api-reference.datatrans.ch/#tag/v1transactions:
# -> It _should_ be unique for each transaction (according to docs)
# -> We use transaction id for checking transaction state
"refno": str(uuid.uuid4()),
# We use autoSettle=True, so that we don't have to settle the transaction:
# -> Be aware that autoSettle has implications of the possible transaction states
# -> see CheckoutState docstring.
"autoSettle": True,
"amount": amount_chf_centimes,
"currency": "CHF",
"language": user.language,
"webhook": {"url": webhook_url},
"redirect": {
"successUrl": redirect_url_success,
"errorUrl": redirect_url_error,
"cancelUrl": redirect_url_cancel,
},
}
self._basic_auth_key = os.getenv("DATATRANS_BASIC_AUTH_KEY")
self._hmac_key = os.getenv("DATATRANS_HMAC_KEY")
logger.info("Initiating transaction", payload=payload)
if not self._basic_auth_key or not self._hmac_key:
raise ValueError(
"Environment variables for DataTrans API are not properly set."
"Required: DATATRANS_BASIC_AUTH_KEY, DATATRANS_HMAC_KEY"
)
@property
def _headers(self):
return {
"Authorization": f"Basic {self._basic_auth_key}",
response = requests.post(
f"{settings.DATATRANS_API_ENDPOINT}/v1/transactions",
json=payload,
headers={
"Authorization": f"Basic {settings.DATATRANS_BASIC_AUTH_KEY}",
"Content-Type": "application/json",
}
},
)
def init_transaction(
self,
user: User,
amount_chf_centimes: int,
redirect_url_success: str,
redirect_url_error: str,
redirect_url_cancel: str,
webhook_url: str,
):
if "DATATRANS_DEBUG_WEBHOOK_OVERWRITE" in os.environ:
webhook_url = os.environ["DATATRANS_DEBUG_WEBHOOK_OVERWRITE"]
logger.warning(
"Using debug webhook URL. This should only be used for testing.",
webhook_url=webhook_url,
)
payload = {
# See https://api-reference.datatrans.ch/#tag/v1transactions:
# -> It _should_ be unique for each transaction (according to docs)
# -> We use transaction id for checking transaction state
"refno": str(uuid.uuid4()),
# We use autoSettle=True, so that we don't have to settle the transaction:
# -> Be aware that autoSettle has implications of the possible transaction states
# -> see CheckoutState docstring.
"autoSettle": True,
"amount": amount_chf_centimes,
"currency": "CHF",
"language": user.language,
"webhook": {"url": webhook_url},
"redirect": {
"successUrl": redirect_url_success,
"errorUrl": redirect_url_error,
"cancelUrl": redirect_url_cancel,
},
}
logger.info(
"Initiating transaction", payload=payload, api_endpoint=self._api_endpoint
if response.status_code == 201:
transaction_id = response.json()["transactionId"]
logger.info("Transaction initiated", transaction_id=transaction_id)
return transaction_id
else:
raise PaymentException(
"Transaction initiation failed:",
response.json().get("error"),
)
response = requests.post(
f"{self._api_endpoint}/v1/transactions",
json=payload,
headers=self._headers,
)
if response.status_code == 201:
transaction_id = response.json()["transactionId"]
logger.info("Transaction initiated", transaction_id=transaction_id)
return transaction_id
else:
raise PaymentException(
"Transaction initiation failed:",
response.json().get("error"),
)
def get_transaction_state(
transaction_id: str,
) -> CheckoutState | None:
response = requests.get(
f"{settings.DATATRANS_API_ENDPOINT}/v1/transactions/{transaction_id}",
headers={
"Authorization": f"Basic {settings.DATATRANS_BASIC_AUTH_KEY}",
"Content-Type": "application/json",
},
)
def get_transaction_state(
self,
transaction_id: str,
) -> CheckoutState | None:
response = requests.get(
f"{self._api_endpoint}/v1/transactions/{transaction_id}",
headers=self._headers,
)
if response.status_code != 200:
return None
logger.info(
"Transaction status retrieved",
status_code=response.status_code,
response=response.json(),
)
transaction_state = response.json()["status"]
if response.status_code != 200:
return None
logger.info(
"Transaction status retrieved",
status_code=response.status_code,
response=transaction_state,
)
# IMPORTANT: CheckoutState docstring
# for more information about the states!
transaction_state = response.json()["status"]
return CheckoutState(transaction_state)
if transaction_state in [
"initialized",
"settled",
"canceled",
"transmitted",
"failed",
]:
return CheckoutState(transaction_state)
else:
raise PaymentException(
f"Transaction state {transaction_state} should either not happen"
"due to autoSettle=true or we forgot to handle it. -> See also CheckoutState docstring."
)
def is_webhook_request_signature_valid(self, request) -> bool:
"""
Check if the Datatrans-Signature header is valid.
https://docs.datatrans.ch/docs/additional-security
"""
# Datatrans-Signature: t={{timestamp}},s0={{signature}}
datatrans_signature = request.headers.get("Datatrans-Signature", "")
try:
# received signature
parts = datatrans_signature.split(",")
timestamp = parts[0].split("=")[1]
received_signature = parts[1].split("=")[1]
# payload signature
payload = request.body
key_hex_bytes = bytes.fromhex(self._hmac_key)
calculated_signature = hmac.new(
key_hex_bytes, bytes(str(timestamp), "utf-8") + payload, hashlib.sha256
)
return calculated_signature == received_signature
except (IndexError, ValueError):
logger.warning(
"Invalid Datatrans-Signature header format",
datatrans_signature=datatrans_signature,
)
return False
def get_payment_url(self, transaction_id: str):
return f"{self._pay_endpoint}/v1/start/{transaction_id}"
def get_payment_url(transaction_id: str):
return f"{settings.DATATRANS_PAY_URL}/v1/start/{transaction_id}"

View File

@ -0,0 +1,41 @@
from unittest import TestCase
from vbv_lernwelt.shop.services import is_signature_valid
class DatatransWebhookSigning(TestCase):
# Key is from their example in the docs, not ours! :D
HMAC_KEY_FROM_THE_DOCS_NOT_HAZARDOUS = (
"861bbfc01e089259091927d6ad7f71c8"
"b46b7ee13499574e83c633b74cdc29e3"
"b7e262e41318c8425c520f146986675f"
"dd58a4531a01c99f06da378fdab0414a"
)
def test_signature_happy_ala_docs(self):
# GIVEN
payload = b"HELLO"
signature = "t=1605697463367,s0=82ef9a8178dcb4df0b71540fa06d7da826ecb26e1977e230bdc8c9d6f9f1af84"
# WHEN / THEN
self.assertTrue(
is_signature_valid(
hmac_key=self.HMAC_KEY_FROM_THE_DOCS_NOT_HAZARDOUS,
payload=payload,
signature=signature,
)
)
def test_signature_not_happy(self):
# GIVEN
tampered_payload = b"HELLO=I=TAMPERED=WITH=PAYLOAD=HIHI=I=AM=EVIL"
signature = "t=1605697463367,s0=82ef9a8178dcb4df0b71540fa06d7da826ecb26e1977e230bdc8c9d6f9f1af84"
# THEN
self.assertFalse(
is_signature_valid(
hmac_key=self.HMAC_KEY_FROM_THE_DOCS_NOT_HAZARDOUS,
payload=tampered_payload,
signature=signature,
)
)

View File

@ -13,29 +13,14 @@ from vbv_lernwelt.shop.models import (
VV_PRODUCT_SKU,
)
from vbv_lernwelt.shop.serializers import BillingAddressSerializer
from vbv_lernwelt.shop.services import DataTransPaymentProvider
from vbv_lernwelt.shop.services import (
get_payment_url,
init_transaction,
is_signature_valid,
)
logger = structlog.get_logger(__name__)
URL_CHECKOUT_COMPLETE = "/onboarding/vv/checkout/complete"
URL_CHECKOUT_ADDRESS = "/onboarding/vv/checkout/address"
def checkout_error_url(base_url):
return f"{base_url}/onboarding/vv/checkout/address?error=true"
def checkout_cancel_url(base_url):
return f"{base_url}/"
def checkout_success_url(base_url):
return f"{base_url}/onboarding/vv/checkout/complete"
def webhook_url(base_url):
return f"{base_url}/api/shop/transaction/webhook"
@api_view(["GET"])
@permission_classes([IsAuthenticated])
@ -72,23 +57,19 @@ def update_billing_address(request):
@api_view(["POST"])
def transaction_webhook(request):
"""Webhook endpoint for Datatrans to notify about transaction state changes."""
logger.info("Transaction Webhook Received", body=request.body)
logger.info("Webhook: Datatrans called transaction webhook", body=request.body)
datatrans = DataTransPaymentProvider()
# FIXME verify signature, not working yet
# if not datatrans.is_webhook_request_signature_valid(
# request=request,
# ):
# logger.error("Invalid webhook signature")
# return JsonResponse({"status": "error"}, status=status.HTTP_400_BAD_REQUEST)
if not is_signature_valid(
signature=request.headers.get("Datatrans-Signature", ""),
payload=request.body,
):
logger.warning("Invalid signature")
return JsonResponse({"status": "invalid signature"}, status=400)
transaction = request.data
transaction_id = transaction["transactionId"]
transaction_status = transaction["status"]
# FIXME just pass the checkout_id as sequence number?
checkout_info = CheckoutInformation.objects.get(transaction_id=transaction_id)
checkout_info.state = transaction_status
checkout_info.webhook_history.append(transaction)
@ -106,13 +87,12 @@ def transaction_webhook(request):
def checkout_vv(request):
"""
Checkout for the Versicherungsvermittler product (VV).
VV_PRODUCT_SKU: The one and only product, thus hardcoded
Expected to be created in the admin interface first.
"""
# The one and only product, thus hardcoded
# Expected to be created in the admin interface first.
sku = VV_PRODUCT_SKU
logger.info("Checkout requested", sku=sku, user_id=request.user.id)
logger.info(f"Checkout requested: sku={sku}", user_id=request.user.id)
try:
product = Product.objects.get(sku=sku)
@ -126,31 +106,21 @@ def checkout_vv(request):
product_sku=sku,
)
datatrans = DataTransPaymentProvider()
# already purchased (settled or transmitted)
if checkouts.filter(
state__in=[CheckoutState.SETTLED, CheckoutState.TRANSMITTED]
).exists():
return JsonResponse({"next_step_url": URL_CHECKOUT_COMPLETE})
return JsonResponse({"next_step_url": checkout_success_url()})
# FIXME: Re-using seems not to work -> just create a new one for now
# Also check if failed transactions get notified via webhook
# already initialized -> redirect to payment page again
# if checkout := checkouts.filter(state=CheckoutState.INITIALIZED).first():
# return JsonResponse(
# {
# "next_step_url": datatrans.get_payment_url(
# transaction_id=checkout.transaction_id
# )
# }
# )
if checkout := checkouts.filter(state=CheckoutState.INITIALIZED).first():
return JsonResponse(
{"next_step_url": get_payment_url(transaction_id=checkout.transaction_id)}
)
address = request.data["address"]
# not yet initialized at all, or canceled/failed
base_redirect_url = request.data["redirect_url"]
# not yet initialized at all or canceled/failed
transaction_id = datatrans.init_transaction(
transaction_id = init_transaction(
user=request.user,
amount_chf_centimes=product.price,
redirect_url_success=checkout_success_url(base_redirect_url),
@ -169,9 +139,26 @@ def checkout_vv(request):
product_price=product.price,
product_name=product.name,
product_description=product.description,
**address,
# address
**request.data["address"],
)
return JsonResponse(
{"next_step_url": datatrans.get_payment_url(transaction_id=transaction_id)}
{"next_step_url": get_payment_url(transaction_id=transaction_id)}
)
def webhook_url(base_url: str) -> str:
return f"{base_url}/api/shop/transaction/webhook"
def checkout_error_url(base_url: str) -> str:
return f"{base_url}/onboarding/vv/checkout/address?error=true"
def checkout_cancel_url(base_url: str) -> str:
return f"{base_url}/"
def checkout_success_url(base_url: str = "") -> str:
return f"{base_url}/onboarding/vv/checkout/complete"