from datetime import datetime, timedelta from django.conf import settings import logging import requests from oauth.oauth_client import oauth from users.models import License logger = logging.getLogger(__name__) class HepClientException(Exception): pass class HepClientUnauthorizedException(Exception): pass class HepClientNoTokenException(Exception): pass class HepClient: URL = settings.HEP_URL WEBSITE_ID = 1 HEADERS = { 'accept': 'application/json', 'content-type': 'application/json' } def _call(self, url, method='get', data=None, request=None, token=None): request_url = f'{self.URL}{url}' token_parameters = { 'token': token, 'request': request } if method == 'post': response = requests.post(request_url, json=data) elif method == 'get': response = oauth.hep.get(url, **token_parameters) elif method == 'put': response = requests.put(request_url, data=data) # Todo handle 401 and most important network errors if response.status_code == 401: raise HepClientUnauthorizedException(response.status_code, response.json()) elif response.status_code != 200: raise HepClientException(response.status_code, response.json()) return response def is_email_verified(self, user_data): return user_data['email_verified_at'] is not None def user_details(self, request=None, token=None): if request is None and token is None: raise HepClientNoTokenException response = self._call('/api/auth/user', request=request, token=token) return response.json()['data'] def customers_search(self, admin_token, email): response = self._call('/rest/V1/customers/search?searchCriteria[filterGroups][0][filters][0][field]=' f'email&searchCriteria[filterGroups][0][filters][0][value]={email}', additional_headers={'authorization': f'Bearer {admin_token}'}) json_data = response.json() if len(json_data['items']) > 0: return json_data['items'][0] return None def customers_by_id(self, admin_token, user_id): response = self._call('/rest/V1/customers/{}'.format(user_id), additional_headers={'authorization': f'Bearer {admin_token}'}) return response.json() def _customer_orders(self, admin_token, customer_id): url = ('/rest/V1/orders/?searchCriteria[filterGroups][0][filters][0][' f'field]=customer_id&searchCriteria[filterGroups][0][filters][0][value]={customer_id}') response = self._call(url, additional_headers={'authorization': 'Bearer {}'.format(admin_token)}) return response.json() def coupon_redeem(self, coupon, customer_id): try: response = self._call(f'/rest/deutsch/V1/coupon/{coupon}/customer/{customer_id}', method='put') except HepClientException: return None response_data = response.json() if response_data[0] == '201': return None return response_data[0] def myskillbox_product_for_customer(self, admin_token, customer_id): orders = self._customer_orders(admin_token, customer_id) products = self._extract_myskillbox_products(orders) if len(products) == 0: return None else: return self._get_relevant_product(products) def _extract_myskillbox_products(self, orders): products = [] for order_item in orders['items']: status = '' if 'status' in order_item: status = order_item['status'] for item in order_item['items']: order_id = -1 if 'order_id' in item: order_id = item['order_id'] if item['sku'] in list(MYSKILLBOX_LICENSES.keys()): product = { 'raw': item, 'activated': self._get_item_activation(order_item), 'status': status, 'order_id': order_id, 'license': MYSKILLBOX_LICENSES[item['sku']], 'isbn': item['sku'] } products.append(product) return products def _get_item_activation(self, item): if 'created_at' in item: return datetime.strptime(item['created_at'], '%Y-%m-%d %H:%M:%S') def _get_relevant_product(self, products): def filter_valid_products(product): if product['status'] != 'complete': return False expiry_delta = product['activated'] + timedelta(product['license']['duration']) if License.is_product_active(expiry_delta, product['isbn']): return True else: return False active_products = list(filter(filter_valid_products, products)) if len(active_products) == 0: return None elif len(active_products) == 1: return active_products[0] else: return self._select_from_teacher_products(active_products) def _select_from_teacher_products(self, active_products): teacher_edition = None # select first teacher product, as they are all valid it does not matter which one for product in active_products: if product['license']['edition'] == TEACHER_KEY: teacher_edition = product break # select a student product, as they are all valid it does not matter which one if not teacher_edition: return active_products[0] return teacher_edition