skillbox/server/oauth/hep_client.py

196 lines
6.6 KiB
Python

import requests
from django.conf import settings
from django.utils.dateparse import parse_datetime
from datetime import timedelta
from oauth.models import OAuth2Token
from oauth.oauth_client import oauth, fetch_token
from users.licenses import get_license_dict, is_myskillbox_product, TEACHER_KEY
from users.models import License
from core.logger import get_logger
logger = get_logger(__name__)
VALID_PRODUCT_STATES = ['waiting', 'paid', 'completed', 'shipped']
licenses = get_license_dict()
class HepClientException(Exception):
pass
class HepClientUnauthorizedException(Exception):
pass
class HepClientNoTokenException(Exception):
pass
class HepClient:
def _call(self, url, token_dict, method='get', data=None):
headers = {"accept": "application/json"}
if method == 'post':
response = oauth.hep.post(url, json=data, token=token_dict, headers=headers)
elif method == 'get':
response = oauth.hep.get(url, params=data, token=token_dict, headers=headers)
elif method == 'put':
response = oauth.hep.put(url, data=data, token=token_dict, headers=headers)
return self._handle_response(response)
def _handle_response(self, response):
if response.status_code == 401:
raise HepClientUnauthorizedException(response.status_code, response.json())
elif response.status_code != 200:
logger.warning(f'Hepclient error: Received {response.status_code} {response.json()}')
raise HepClientException(response.status_code, response.json())
return response
def _get_valid_token(self, request, token_dict):
if request is None and token_dict is None:
raise HepClientNoTokenException
if not token_dict:
token_dict = fetch_token('', request)
if not token_dict:
raise HepClientNoTokenException
if OAuth2Token.is_dict_expired(token_dict):
refresh_data = self._refresh_token(token_dict)
token, refresh_success = OAuth2Token.update_dict_with_refresh_data(refresh_data, token_dict['access_token'])
if not refresh_success:
raise HepClientUnauthorizedException
token_dict = token.to_token()
return token_dict
def _refresh_token(self, token_dict):
data = {
'grant_type': 'refresh_token',
'refresh_token': token_dict['refresh_token'],
'client_id': settings.AUTHLIB_OAUTH_CLIENTS['hep']['client_id'],
'client_secret': settings.AUTHLIB_OAUTH_CLIENTS['hep']['client_secret'],
'scope': ''
}
response = requests.post(f'{settings.AUTHLIB_OAUTH_CLIENTS["hep"]["api_base_url"]}/oauth/token', json=data)
return self._handle_response(response).json()
def is_email_verified(self, user_data):
return user_data['email_verified_at'] is not None
def user_details(self, request=None, token_dict=None):
token_dict = self._get_valid_token(request, token_dict)
response = self._call('api/auth/user', token_dict)
return response.json()['data']
def logout(self, request=None, token_dict=None):
token_dict = self._get_valid_token(request, token_dict)
self._call('api/auth/logout', token_dict, method='post')
return True
def fetch_eorders(self, request=None, token_dict=None):
token_dict = self._get_valid_token(request, token_dict)
data = {
'filters[product_type]': 'eLehrmittel',
}
response = self._call('api/partners/users/orders/search', token_dict, data=data)
return response.json()['data']
def active_myskillbox_product_for_customer(self, request=None, token_dict=None):
eorders = self.fetch_eorders(request=request, token_dict=token_dict)
myskillbox_products = self._extract_myskillbox_products(eorders)
if len(myskillbox_products) == 0:
return None
else:
return self._get_active_product(myskillbox_products)
def redeem_coupon(self, coupon_code, customer_id, request=None, token_dict=None):
token_dict = self._get_valid_token(request, token_dict)
try:
response = self._call(f'api/partners/users/{customer_id}/coupons/redeem', token_dict, method='post',
data={'code': coupon_code})
except HepClientException:
return None
response_data = response.json()
return response_data
def _extract_myskillbox_products(self, eorders):
products = []
for eorder in eorders:
if 'items' not in eorder:
continue
status = eorder.get('status', '')
for entry in eorder['items']:
product = self.entry_to_product(entry, self._get_item_activation(eorder), status)
if product:
products.append(product)
return products
def entry_to_product(self, entry, activation_date, status):
isbn = entry['isbn']
if is_myskillbox_product(isbn) and activation_date:
return {
'raw': entry,
'activated': activation_date,
'status': status,
'order_id': entry['id'],
'license': licenses[isbn],
'isbn': entry['isbn']
}
return None
def _get_item_activation(self, eorder):
if 'created_at' in eorder:
return parse_datetime(eorder['created_at'])
else:
return None
def _get_active_product(self, products):
def filter_valid_products(product):
if product['status'] not in VALID_PRODUCT_STATES:
return False
expiry_delta = product['activated'] + timedelta(product['license']['duration'])
return License.is_product_active(expiry_delta, product['isbn'])
active_products = list(filter(filter_valid_products, products))
if len(active_products) == 0:
return None
elif len(active_products) == 1:
return active_products[0]
else:
return self._select_from_teacher_products(active_products)
def _select_from_teacher_products(self, active_products):
# select first teacher product, as they are all valid it does not matter which one
for product in active_products:
if product['license']['edition'] == TEACHER_KEY:
return product
# select a student product, as they are all valid it does not matter which one
return active_products[0]