227 lines
7.1 KiB
Python
227 lines
7.1 KiB
Python
import hashlib
|
|
import hmac
|
|
import os
|
|
import uuid
|
|
from abc import ABC, abstractmethod
|
|
|
|
import requests
|
|
import structlog
|
|
|
|
from vbv_lernwelt.core.admin import User
|
|
from vbv_lernwelt.shop.models import CheckoutState
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
class PaymentException(Exception):
|
|
pass
|
|
|
|
|
|
class PaymentProvider(ABC):
|
|
@abstractmethod
|
|
def init_transaction(
|
|
self,
|
|
user: User,
|
|
amount_chf_centimes: int,
|
|
redirect_url_success: str,
|
|
redirect_url_error: str,
|
|
redirect_url_cancel: str,
|
|
webhook_url: str,
|
|
):
|
|
raise NotImplementedError()
|
|
|
|
@abstractmethod
|
|
def get_transaction_state(
|
|
self,
|
|
transaction_id: str,
|
|
) -> CheckoutState | None:
|
|
"""
|
|
Get the state of a transaction.
|
|
:param transaction_id: Transaction ID
|
|
:return: Transaction state or None if transaction does not exist
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
@abstractmethod
|
|
def get_payment_url(self, transaction_id: str):
|
|
"""
|
|
Get the URL to redirect to for payment.
|
|
:param transaction_id: Transaction ID
|
|
:return: URL to redirect to for payment
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
|
|
class DataTransPaymentProvider(PaymentProvider):
|
|
"""
|
|
Configuration:
|
|
|
|
- 1. USE_DATATRANS_PRODUCTION, e.g. "False" for sandbox, "True" for production. Default: False
|
|
|
|
- 2. DATATRANS_BASIC_AUTH_KEY (see https://admin.sandbox.datatrans.com/MenuDispatch.jsp?main=1&sub=4)
|
|
- echo -n "Username:Password" | base64
|
|
|
|
- 3. DATATRANS_HMAC_KEY (see https://admin.sandbox.datatrans.com/MerchSecurAdmin.jsp)
|
|
|
|
- 4. DATATRANS_DEBUG_WEBHOOK_OVERWRITE (optional webhook URL overwrite for debugging purposes)
|
|
"""
|
|
|
|
def __init__(self):
|
|
is_production_environment = (
|
|
os.getenv("USE_DATATRANS_PRODUCTION", "False").lower() == "true"
|
|
)
|
|
|
|
self._api_endpoint = (
|
|
"https://api.sandbox.datatrans.com"
|
|
if not is_production_environment
|
|
else "https://api.datatrans.com"
|
|
)
|
|
|
|
self._pay_endpoint = (
|
|
"https://pay.sandbox.datatrans.com"
|
|
if not is_production_environment
|
|
else "https://pay.datatrans.com"
|
|
)
|
|
|
|
self._basic_auth_key = os.getenv("DATATRANS_BASIC_AUTH_KEY")
|
|
self._hmac_key = os.getenv("DATATRANS_HMAC_KEY")
|
|
|
|
if not self._basic_auth_key or not self._hmac_key:
|
|
raise ValueError(
|
|
"Environment variables for DataTrans API are not properly set."
|
|
"Required: DATATRANS_BASIC_AUTH_KEY, DATATRANS_HMAC_KEY"
|
|
)
|
|
|
|
@property
|
|
def _headers(self):
|
|
return {
|
|
"Authorization": f"Basic {self._basic_auth_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
def init_transaction(
|
|
self,
|
|
user: User,
|
|
amount_chf_centimes: int,
|
|
redirect_url_success: str,
|
|
redirect_url_error: str,
|
|
redirect_url_cancel: str,
|
|
webhook_url: str,
|
|
):
|
|
if "DATATRANS_DEBUG_WEBHOOK_OVERWRITE" in os.environ:
|
|
webhook_url = os.environ["DATATRANS_DEBUG_WEBHOOK_OVERWRITE"]
|
|
logger.warning(
|
|
"Using debug webhook URL. This should only be used for testing.",
|
|
webhook_url=webhook_url,
|
|
)
|
|
|
|
payload = {
|
|
# See https://api-reference.datatrans.ch/#tag/v1transactions:
|
|
# -> It _should_ be unique for each transaction (according to docs)
|
|
# -> We use transaction id for checking transaction state
|
|
"refno": str(uuid.uuid4()),
|
|
# We use autoSettle=True, so that we don't have to settle the transaction:
|
|
# -> Be aware that autoSettle has implications of the possible transaction states
|
|
# -> see CheckoutState docstring.
|
|
"autoSettle": True,
|
|
"amount": amount_chf_centimes,
|
|
"currency": "CHF",
|
|
"language": user.language,
|
|
"webhook": {"url": webhook_url},
|
|
"redirect": {
|
|
"successUrl": redirect_url_success,
|
|
"errorUrl": redirect_url_error,
|
|
"cancelUrl": redirect_url_cancel,
|
|
},
|
|
}
|
|
|
|
logger.info(
|
|
"Initiating transaction", payload=payload, api_endpoint=self._api_endpoint
|
|
)
|
|
|
|
response = requests.post(
|
|
f"{self._api_endpoint}/v1/transactions",
|
|
json=payload,
|
|
headers=self._headers,
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
transaction_id = response.json()["transactionId"]
|
|
logger.info("Transaction initiated", transaction_id=transaction_id)
|
|
return transaction_id
|
|
else:
|
|
raise PaymentException(
|
|
"Transaction initiation failed:",
|
|
response.json().get("error"),
|
|
)
|
|
|
|
def get_transaction_state(
|
|
self,
|
|
transaction_id: str,
|
|
) -> CheckoutState | None:
|
|
response = requests.get(
|
|
f"{self._api_endpoint}/v1/transactions/{transaction_id}",
|
|
headers=self._headers,
|
|
)
|
|
|
|
logger.info(
|
|
"Transaction status retrieved",
|
|
status_code=response.status_code,
|
|
response=response.json(),
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return None
|
|
|
|
# IMPORTANT: CheckoutState docstring
|
|
# for more information about the states!
|
|
transaction_state = response.json()["status"]
|
|
|
|
if transaction_state in [
|
|
"initialized",
|
|
"settled",
|
|
"canceled",
|
|
"transmitted",
|
|
"failed",
|
|
]:
|
|
return CheckoutState(transaction_state)
|
|
else:
|
|
raise PaymentException(
|
|
f"Transaction state {transaction_state} should either not happen"
|
|
"due to autoSettle=true or we forgot to handle it. -> See also CheckoutState docstring."
|
|
)
|
|
|
|
def is_webhook_request_signature_valid(self, request) -> bool:
|
|
"""
|
|
Check if the Datatrans-Signature header is valid.
|
|
https://docs.datatrans.ch/docs/additional-security
|
|
"""
|
|
|
|
# Datatrans-Signature: t={{timestamp}},s0={{signature}}
|
|
datatrans_signature = request.headers.get("Datatrans-Signature", "")
|
|
|
|
try:
|
|
# received signature
|
|
parts = datatrans_signature.split(",")
|
|
timestamp = parts[0].split("=")[1]
|
|
received_signature = parts[1].split("=")[1]
|
|
|
|
# payload signature
|
|
payload = request.body
|
|
key_hex_bytes = bytes.fromhex(self._hmac_key)
|
|
calculated_signature = hmac.new(
|
|
key_hex_bytes, bytes(str(timestamp), "utf-8") + payload, hashlib.sha256
|
|
)
|
|
|
|
return calculated_signature == received_signature
|
|
|
|
except (IndexError, ValueError):
|
|
logger.warning(
|
|
"Invalid Datatrans-Signature header format",
|
|
datatrans_signature=datatrans_signature,
|
|
)
|
|
return False
|
|
|
|
def get_payment_url(self, transaction_id: str):
|
|
return f"{self._pay_endpoint}/v1/start/{transaction_id}"
|