208 lines
6.8 KiB
Python
208 lines
6.8 KiB
Python
import requests
|
|
from django.conf import settings
|
|
from django.utils.dateparse import parse_datetime
|
|
from datetime import timedelta
|
|
|
|
from oauth.models import OAuth2Token
|
|
from oauth.oauth_client import oauth, fetch_token
|
|
from users.licenses import get_license_dict, is_myskillbox_product, TEACHER_KEY
|
|
from users.models import License
|
|
|
|
from core.logger import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
VALID_PRODUCT_STATES = ["waiting", "paid", "completed", "shipped"]
|
|
|
|
licenses = get_license_dict()
|
|
|
|
|
|
class HepClientException(Exception):
|
|
pass
|
|
|
|
|
|
class HepClientUnauthorizedException(Exception):
|
|
pass
|
|
|
|
|
|
class HepClientNoTokenException(Exception):
|
|
pass
|
|
|
|
|
|
class HepClient:
|
|
def _call(self, url, token_dict, method="get", data=None):
|
|
headers = {"accept": "application/json"}
|
|
|
|
if method == "post":
|
|
response = oauth.hep.post(url, json=data, token=token_dict, headers=headers)
|
|
elif method == "get":
|
|
response = oauth.hep.get(
|
|
url, params=data, token=token_dict, headers=headers
|
|
)
|
|
elif method == "put":
|
|
response = oauth.hep.put(url, data=data, token=token_dict, headers=headers)
|
|
|
|
return self._handle_response(response)
|
|
|
|
def _handle_response(self, response):
|
|
if response.status_code == 401:
|
|
raise HepClientUnauthorizedException(response.status_code, response.json())
|
|
elif response.status_code != 200:
|
|
logger.warning(
|
|
f"Hepclient error: Received {response.status_code} {response.json()}"
|
|
)
|
|
raise HepClientException(response.status_code, response.json())
|
|
|
|
return response
|
|
|
|
def _get_valid_token(self, request, token_dict):
|
|
if request is None and token_dict is None:
|
|
raise HepClientNoTokenException
|
|
|
|
if not token_dict:
|
|
token_dict = fetch_token("", request)
|
|
if not token_dict:
|
|
raise HepClientNoTokenException
|
|
|
|
if OAuth2Token.is_dict_expired(token_dict):
|
|
refresh_data = self._refresh_token(token_dict)
|
|
token, refresh_success = OAuth2Token.update_dict_with_refresh_data(
|
|
refresh_data, token_dict["access_token"]
|
|
)
|
|
if not refresh_success:
|
|
raise HepClientUnauthorizedException
|
|
|
|
token_dict = token.to_token()
|
|
|
|
return token_dict
|
|
|
|
def _refresh_token(self, token_dict):
|
|
data = {
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": token_dict["refresh_token"],
|
|
"client_id": settings.AUTHLIB_OAUTH_CLIENTS["hep"]["client_id"],
|
|
"client_secret": settings.AUTHLIB_OAUTH_CLIENTS["hep"]["client_secret"],
|
|
"scope": "",
|
|
}
|
|
|
|
response = requests.post(
|
|
f'{settings.AUTHLIB_OAUTH_CLIENTS["hep"]["api_base_url"]}/oauth/token',
|
|
json=data,
|
|
)
|
|
return self._handle_response(response).json()
|
|
|
|
def is_email_verified(self, user_data):
|
|
return user_data["email_verified_at"] is not None
|
|
|
|
def user_details(self, request=None, token_dict=None):
|
|
token_dict = self._get_valid_token(request, token_dict)
|
|
response = self._call("api/auth/user", token_dict)
|
|
return response.json()["data"]
|
|
|
|
def logout(self, request=None, token_dict=None):
|
|
token_dict = self._get_valid_token(request, token_dict)
|
|
self._call("api/auth/logout", token_dict, method="post")
|
|
return True
|
|
|
|
def fetch_eorders(self, request=None, token_dict=None):
|
|
token_dict = self._get_valid_token(request, token_dict)
|
|
data = {
|
|
"filters[product_type]": "eLehrmittel",
|
|
}
|
|
response = self._call("api/partners/users/orders/search", token_dict, data=data)
|
|
return response.json()["data"]
|
|
|
|
def active_myskillbox_product_for_customer(self, request=None, token_dict=None):
|
|
eorders = self.fetch_eorders(request=request, token_dict=token_dict)
|
|
|
|
myskillbox_products = self._extract_myskillbox_products(eorders)
|
|
|
|
if len(myskillbox_products) == 0:
|
|
return None
|
|
else:
|
|
return self._get_active_product(myskillbox_products)
|
|
|
|
def redeem_coupon(self, coupon_code, customer_id, request=None, token_dict=None):
|
|
token_dict = self._get_valid_token(request, token_dict)
|
|
try:
|
|
response = self._call(
|
|
f"api/partners/users/{customer_id}/coupons/redeem",
|
|
token_dict,
|
|
method="post",
|
|
data={"code": coupon_code},
|
|
)
|
|
except HepClientException:
|
|
logger.warning("could not redeem")
|
|
return None
|
|
|
|
response_data = response.json()
|
|
return response_data
|
|
|
|
def _extract_myskillbox_products(self, eorders):
|
|
products = []
|
|
|
|
for eorder in eorders:
|
|
if "items" not in eorder:
|
|
continue
|
|
|
|
status = eorder.get("status", "")
|
|
|
|
for entry in eorder["items"]:
|
|
product = self.entry_to_product(
|
|
entry, self._get_item_activation(eorder), status
|
|
)
|
|
|
|
if product:
|
|
products.append(product)
|
|
|
|
return products
|
|
|
|
def entry_to_product(self, entry, activation_date, status):
|
|
isbn = entry["isbn"]
|
|
if is_myskillbox_product(isbn) and activation_date:
|
|
return {
|
|
"raw": entry,
|
|
"activated": activation_date,
|
|
"status": status,
|
|
"order_id": entry["id"],
|
|
"license": licenses[isbn],
|
|
"isbn": entry["isbn"],
|
|
}
|
|
|
|
return None
|
|
|
|
def _get_item_activation(self, eorder):
|
|
if "created_at" in eorder:
|
|
return parse_datetime(eorder["created_at"])
|
|
else:
|
|
return None
|
|
|
|
def _get_active_product(self, products):
|
|
def filter_valid_products(product):
|
|
if product["status"] not in VALID_PRODUCT_STATES:
|
|
return False
|
|
|
|
expiry_delta = product["activated"] + timedelta(
|
|
product["license"]["duration"]
|
|
)
|
|
|
|
return License.is_product_active(expiry_delta, product["isbn"])
|
|
|
|
active_products = list(filter(filter_valid_products, products))
|
|
|
|
if len(active_products) == 0:
|
|
return None
|
|
elif len(active_products) == 1:
|
|
return active_products[0]
|
|
else:
|
|
return self._select_from_teacher_products(active_products)
|
|
|
|
def _select_from_teacher_products(self, active_products):
|
|
# select first teacher product, as they are all valid it does not matter which one
|
|
for product in active_products:
|
|
if product["license"]["edition"] == TEACHER_KEY:
|
|
return product
|
|
|
|
# select a student product, as they are all valid it does not matter which one
|
|
return active_products[0]
|