import requests from django.conf import settings from django.utils.dateparse import parse_datetime from datetime import timedelta from oauth.models import OAuth2Token from oauth.oauth_client import oauth, fetch_token from users.licenses import get_license_dict, is_myskillbox_product, TEACHER_KEY from users.models import License from core.logger import get_logger logger = get_logger(__name__) VALID_PRODUCT_STATES = ['waiting', 'paid', 'completed', 'shipped'] licenses = get_license_dict() class HepClientException(Exception): pass class HepClientUnauthorizedException(Exception): pass class HepClientNoTokenException(Exception): pass class HepClient: def _call(self, url, token_dict, method='get', data=None): headers = {"accept": "application/json"} if method == 'post': response = oauth.hep.post(url, json=data, token=token_dict, headers=headers) elif method == 'get': response = oauth.hep.get(url, params=data, token=token_dict, headers=headers) elif method == 'put': response = oauth.hep.put(url, data=data, token=token_dict, headers=headers) return self._handle_response(response) def _handle_response(self, response): if response.status_code == 401: raise HepClientUnauthorizedException(response.status_code, response.json()) elif response.status_code != 200: logger.warning(f'Hepclient error: Received {response.status_code} {response.json()}') raise HepClientException(response.status_code, response.json()) return response def _get_valid_token(self, request, token_dict): if request is None and token_dict is None: raise HepClientNoTokenException if not token_dict: token_dict = fetch_token('', request) if not token_dict: raise HepClientNoTokenException if OAuth2Token.is_dict_expired(token_dict): refresh_data = self._refresh_token(token_dict) token, refresh_success = OAuth2Token.update_dict_with_refresh_data(refresh_data, token_dict['access_token']) if not refresh_success: raise HepClientUnauthorizedException token_dict = token.to_token() return token_dict def _refresh_token(self, token_dict): data = { 'grant_type': 'refresh_token', 'refresh_token': token_dict['refresh_token'], 'client_id': settings.AUTHLIB_OAUTH_CLIENTS['hep']['client_id'], 'client_secret': settings.AUTHLIB_OAUTH_CLIENTS['hep']['client_secret'], 'scope': '' } response = requests.post(f'{settings.AUTHLIB_OAUTH_CLIENTS["hep"]["api_base_url"]}/oauth/token', json=data) return self._handle_response(response).json() def is_email_verified(self, user_data): return user_data['email_verified_at'] is not None def user_details(self, request=None, token_dict=None): token_dict = self._get_valid_token(request, token_dict) response = self._call('api/auth/user', token_dict) return response.json()['data'] def logout(self, request=None, token_dict=None): token_dict = self._get_valid_token(request, token_dict) self._call('api/auth/logout', token_dict, method='post') return True def fetch_eorders(self, request=None, token_dict=None): token_dict = self._get_valid_token(request, token_dict) data = { 'filters[product_type]': 'eLehrmittel', } response = self._call('api/partners/users/orders/search', token_dict, data=data) return response.json()['data'] def active_myskillbox_product_for_customer(self, request=None, token_dict=None): eorders = self.fetch_eorders(request=request, token_dict=token_dict) myskillbox_products = self._extract_myskillbox_products(eorders) if len(myskillbox_products) == 0: return None else: return self._get_active_product(myskillbox_products) def redeem_coupon(self, coupon_code, customer_id, request=None, token_dict=None): token_dict = self._get_valid_token(request, token_dict) try: response = self._call(f'api/partners/users/{customer_id}/coupons/redeem', token_dict, method='post', data={'code': coupon_code}) except HepClientException: return None response_data = response.json() return response_data def _extract_myskillbox_products(self, eorders): products = [] for eorder in eorders: if 'items' not in eorder: continue status = eorder.get('status', '') for entry in eorder['items']: product = self.entry_to_product(entry, self._get_item_activation(eorder), status) if product: products.append(product) return products def entry_to_product(self, entry, activation_date, status): isbn = entry['isbn'] if is_myskillbox_product(isbn) and activation_date: return { 'raw': entry, 'activated': activation_date, 'status': status, 'order_id': entry['id'], 'license': licenses[isbn], 'isbn': entry['isbn'] } return None def _get_item_activation(self, eorder): if 'created_at' in eorder: return parse_datetime(eorder['created_at']) else: return None def _get_active_product(self, products): def filter_valid_products(product): if product['status'] not in VALID_PRODUCT_STATES: return False expiry_delta = product['activated'] + timedelta(product['license']['duration']) return License.is_product_active(expiry_delta, product['isbn']) active_products = list(filter(filter_valid_products, products)) if len(active_products) == 0: return None elif len(active_products) == 1: return active_products[0] else: return self._select_from_teacher_products(active_products) def _select_from_teacher_products(self, active_products): # select first teacher product, as they are all valid it does not matter which one for product in active_products: if product['license']['edition'] == TEACHER_KEY: return product # select a student product, as they are all valid it does not matter which one return active_products[0]