skillbox/server/oauth/hep_client.py

208 lines
6.8 KiB
Python

import requests
from django.conf import settings
from django.utils.dateparse import parse_datetime
from datetime import timedelta
from oauth.models import OAuth2Token
from oauth.oauth_client import oauth, fetch_token
from users.licenses import get_license_dict, is_myskillbox_product, TEACHER_KEY
from users.models import License
from core.logger import get_logger
logger = get_logger(__name__)
VALID_PRODUCT_STATES = ["waiting", "paid", "completed", "shipped"]
licenses = get_license_dict()
class HepClientException(Exception):
pass
class HepClientUnauthorizedException(Exception):
pass
class HepClientNoTokenException(Exception):
pass
class HepClient:
def _call(self, url, token_dict, method="get", data=None):
headers = {"accept": "application/json"}
if method == "post":
response = oauth.hep.post(url, json=data, token=token_dict, headers=headers)
elif method == "get":
response = oauth.hep.get(
url, params=data, token=token_dict, headers=headers
)
elif method == "put":
response = oauth.hep.put(url, data=data, token=token_dict, headers=headers)
return self._handle_response(response)
def _handle_response(self, response):
if response.status_code == 401:
raise HepClientUnauthorizedException(response.status_code, response.json())
elif response.status_code != 200:
logger.warning(
f"Hepclient error: Received {response.status_code} {response.json()}"
)
raise HepClientException(response.status_code, response.json())
return response
def _get_valid_token(self, request, token_dict):
if request is None and token_dict is None:
raise HepClientNoTokenException
if not token_dict:
token_dict = fetch_token("", request)
if not token_dict:
raise HepClientNoTokenException
if OAuth2Token.is_dict_expired(token_dict):
refresh_data = self._refresh_token(token_dict)
token, refresh_success = OAuth2Token.update_dict_with_refresh_data(
refresh_data, token_dict["access_token"]
)
if not refresh_success:
raise HepClientUnauthorizedException
token_dict = token.to_token()
return token_dict
def _refresh_token(self, token_dict):
data = {
"grant_type": "refresh_token",
"refresh_token": token_dict["refresh_token"],
"client_id": settings.AUTHLIB_OAUTH_CLIENTS["hep"]["client_id"],
"client_secret": settings.AUTHLIB_OAUTH_CLIENTS["hep"]["client_secret"],
"scope": "",
}
response = requests.post(
f'{settings.AUTHLIB_OAUTH_CLIENTS["hep"]["api_base_url"]}/oauth/token',
json=data,
)
return self._handle_response(response).json()
def is_email_verified(self, user_data):
return user_data["email_verified_at"] is not None
def user_details(self, request=None, token_dict=None):
token_dict = self._get_valid_token(request, token_dict)
response = self._call("api/auth/user", token_dict)
return response.json()["data"]
def logout(self, request=None, token_dict=None):
token_dict = self._get_valid_token(request, token_dict)
self._call("api/auth/logout", token_dict, method="post")
return True
def fetch_eorders(self, request=None, token_dict=None):
token_dict = self._get_valid_token(request, token_dict)
data = {
"filters[product_type]": "eLehrmittel",
}
response = self._call("api/partners/users/orders/search", token_dict, data=data)
return response.json()["data"]
def active_myskillbox_product_for_customer(self, request=None, token_dict=None):
eorders = self.fetch_eorders(request=request, token_dict=token_dict)
myskillbox_products = self._extract_myskillbox_products(eorders)
if len(myskillbox_products) == 0:
return None
else:
return self._get_active_product(myskillbox_products)
def redeem_coupon(self, coupon_code, customer_id, request=None, token_dict=None):
token_dict = self._get_valid_token(request, token_dict)
try:
response = self._call(
f"api/partners/users/{customer_id}/coupons/redeem",
token_dict,
method="post",
data={"code": coupon_code},
)
except HepClientException:
logger.warning("could not redeem")
return None
response_data = response.json()
return response_data
def _extract_myskillbox_products(self, eorders):
products = []
for eorder in eorders:
if "items" not in eorder:
continue
status = eorder.get("status", "")
for entry in eorder["items"]:
product = self.entry_to_product(
entry, self._get_item_activation(eorder), status
)
if product:
products.append(product)
return products
def entry_to_product(self, entry, activation_date, status):
isbn = entry["isbn"]
if is_myskillbox_product(isbn) and activation_date:
return {
"raw": entry,
"activated": activation_date,
"status": status,
"order_id": entry["id"],
"license": licenses[isbn],
"isbn": entry["isbn"],
}
return None
def _get_item_activation(self, eorder):
if "created_at" in eorder:
return parse_datetime(eorder["created_at"])
else:
return None
def _get_active_product(self, products):
def filter_valid_products(product):
if product["status"] not in VALID_PRODUCT_STATES:
return False
expiry_delta = product["activated"] + timedelta(
product["license"]["duration"]
)
return License.is_product_active(expiry_delta, product["isbn"])
active_products = list(filter(filter_valid_products, products))
if len(active_products) == 0:
return None
elif len(active_products) == 1:
return active_products[0]
else:
return self._select_from_teacher_products(active_products)
def _select_from_teacher_products(self, active_products):
# select first teacher product, as they are all valid it does not matter which one
for product in active_products:
if product["license"]["edition"] == TEACHER_KEY:
return product
# select a student product, as they are all valid it does not matter which one
return active_products[0]