237 lines
7.9 KiB
Python
237 lines
7.9 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
from django.conf import settings
|
|
import logging
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TEACHER_KEY = 'teacher'
|
|
STUDENT_KEY = 'student'
|
|
|
|
MYSKILLBOX_LICENSES = {
|
|
"978-3-0355-1397-4": {
|
|
'edition': STUDENT_KEY,
|
|
'duration': 4 * 365,
|
|
'name': 'Student 4 years'
|
|
},
|
|
"978-3-0355-1860-3": {
|
|
'edition': STUDENT_KEY,
|
|
'duration': 455,
|
|
'name': 'Student 1 year'
|
|
},
|
|
"978-3-0355-1862-7": {
|
|
'edition': STUDENT_KEY,
|
|
'duration': 30,
|
|
'name': 'Student test 1 month'
|
|
},
|
|
"978-3-0355-1861-0": {
|
|
'edition': TEACHER_KEY,
|
|
'duration': 30,
|
|
'name': 'Teacher test 1 month'
|
|
},
|
|
"978-3-0355-1823-8": {
|
|
'edition': TEACHER_KEY,
|
|
'duration': 455,
|
|
'name': 'Teacher 1 year'
|
|
}
|
|
}
|
|
|
|
|
|
class HepClientException(Exception):
|
|
pass
|
|
|
|
|
|
class HepClientUnauthorizedException(Exception):
|
|
pass
|
|
|
|
|
|
class HepClient:
|
|
URL = settings.HEP_URL
|
|
WEBSITE_ID = 1
|
|
HEADERS = {
|
|
'accept': 'application/json',
|
|
'content-type': 'application/json'
|
|
}
|
|
|
|
def _call(self, url, method='get', data=None, additional_headers=None):
|
|
|
|
request_url = f'{self.URL}{url}'
|
|
|
|
if additional_headers:
|
|
headers = {**additional_headers, **self.HEADERS}
|
|
else:
|
|
headers = self.HEADERS
|
|
|
|
if method == 'post':
|
|
response = requests.post(request_url, json=data, headers=headers)
|
|
elif method == 'get':
|
|
if data:
|
|
response = requests.get(request_url, headers=headers, data=data)
|
|
else:
|
|
response = requests.get(request_url, headers=headers)
|
|
elif method == 'put':
|
|
response = requests.put(request_url, data=data)
|
|
|
|
# Todo handle 401 and most important network errors
|
|
if response.status_code == 401:
|
|
raise HepClientUnauthorizedException(response.status_code, response.json())
|
|
elif response.status_code != 200:
|
|
raise HepClientException(response.status_code, response.json())
|
|
|
|
return response
|
|
|
|
def fetch_admin_token(self, admin_user, password):
|
|
response = self._call('/rest/deutsch/V1/integration/admin/token', 'post',
|
|
data={'username': admin_user, 'password': password})
|
|
return response.content.decode('utf-8')[1:-1]
|
|
|
|
def is_email_available(self, email):
|
|
response = self._call('/rest/deutsch/V1/customers/isEmailAvailable', method='post',
|
|
data={'customerEmail': email, 'websiteId': self.WEBSITE_ID})
|
|
return response.json()
|
|
|
|
def is_email_verified(self, user_data):
|
|
return 'confirmation' not in user_data
|
|
|
|
def customer_verify_email(self, confirmation_key):
|
|
response = self._call('/rest/V1/customers/me', method='put', data={'confirmationKey': confirmation_key})
|
|
return response.json()
|
|
|
|
def customer_create(self, customer_data):
|
|
response = self._call('/rest/deutsch/V1/customers', method='post', data=customer_data)
|
|
return response.json()
|
|
|
|
def customer_token(self, username, password):
|
|
response = self._call('/rest/deutsch/V1/integration/customer/token', 'post',
|
|
data={'username': username, 'password': password})
|
|
return response.json()
|
|
|
|
def customer_me(self, token):
|
|
response = self._call('/rest/V1/customers/me', additional_headers={'authorization': f'Bearer {token}'})
|
|
return response.json()
|
|
|
|
def customer_activate(self, confirmation_key, user_id):
|
|
response = self._call(f'/customer/account/confirm/?back_url=&id={user_id}&key={confirmation_key}', method='get')
|
|
return response
|
|
|
|
def customers_search(self, admin_token, email):
|
|
response = self._call('/rest/V1/customers/search?searchCriteria[filterGroups][0][filters][0][field]='
|
|
f'email&searchCriteria[filterGroups][0][filters][0][value]={email}',
|
|
additional_headers={'authorization': f'Bearer {admin_token}'})
|
|
|
|
json_data = response.json()
|
|
if len(json_data['items']) > 0:
|
|
return json_data['items'][0]
|
|
return None
|
|
|
|
def customers_by_id(self, admin_token, user_id):
|
|
response = self._call('/rest/V1/customers/{}'.format(user_id),
|
|
additional_headers={'authorization': f'Bearer {admin_token}'})
|
|
return response.json()
|
|
|
|
def _customer_orders(self, admin_token, customer_id):
|
|
url = ('/rest/V1/orders/?searchCriteria[filterGroups][0][filters][0]['
|
|
f'field]=customer_id&searchCriteria[filterGroups][0][filters][0][value]={customer_id}')
|
|
|
|
response = self._call(url, additional_headers={'authorization': 'Bearer {}'.format(admin_token)})
|
|
return response.json()
|
|
|
|
def coupon_redeem(self, coupon, customer_id):
|
|
try:
|
|
response = self._call(f'/rest/deutsch/V1/coupon/{coupon}/customer/{customer_id}', method='put')
|
|
except HepClientException:
|
|
return None
|
|
|
|
response_data = response.json()
|
|
if response_data[0] == '201':
|
|
return None
|
|
|
|
return response_data[0]
|
|
|
|
def myskillbox_product_for_customer(self, admin_token, customer_id):
|
|
orders = self._customer_orders(admin_token, customer_id)
|
|
products = self._extract_myskillbox_products(orders)
|
|
|
|
if len(products) == 0:
|
|
return None
|
|
else:
|
|
return self._get_relevant_product(products)
|
|
|
|
def _extract_myskillbox_products(self, orders):
|
|
products = []
|
|
|
|
for order_item in orders['items']:
|
|
|
|
status = ''
|
|
if 'status' in order_item:
|
|
status = order_item['status']
|
|
|
|
for item in order_item['items']:
|
|
|
|
order_id = -1
|
|
if 'order_id' in item:
|
|
order_id = item['order_id']
|
|
|
|
if item['sku'] in list(MYSKILLBOX_LICENSES.keys()):
|
|
product = {
|
|
'raw': item,
|
|
'activated': self._get_item_activation(order_item),
|
|
'status': status,
|
|
'order_id': order_id,
|
|
'license': MYSKILLBOX_LICENSES[item['sku']],
|
|
'isbn': item['sku']
|
|
}
|
|
|
|
products.append(product)
|
|
|
|
return products
|
|
|
|
def _get_item_activation(self, item):
|
|
if 'created_at' in item:
|
|
return datetime.strptime(item['created_at'], '%Y-%m-%d %H:%M:%S')
|
|
|
|
def _get_relevant_product(self, products):
|
|
|
|
def filter_valid_products(product):
|
|
|
|
if product['status'] != 'complete':
|
|
return False
|
|
|
|
expiry_delta = product['activated'] + timedelta(product['license']['duration'])
|
|
|
|
if HepClient.is_product_active(expiry_delta, product['isbn']):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
active_products = list(filter(filter_valid_products, products))
|
|
|
|
if len(active_products) == 0:
|
|
return None
|
|
elif len(active_products) == 1:
|
|
return active_products[0]
|
|
else:
|
|
return self._select_from_teacher_products(active_products)
|
|
|
|
def _select_from_teacher_products(self, active_products):
|
|
teacher_edition = None
|
|
|
|
# select first teacher product, as they are all valid it does not matter which one
|
|
for product in active_products:
|
|
if product['license']['edition'] == TEACHER_KEY:
|
|
teacher_edition = product
|
|
break
|
|
|
|
# select a student product, as they are all valid it does not matter which one
|
|
if not teacher_edition:
|
|
return active_products[0]
|
|
|
|
return teacher_edition
|
|
|
|
@staticmethod
|
|
def is_product_active(expiry_date, isbn):
|
|
now = datetime.now()
|
|
|
|
return expiry_date >= now >= expiry_date - timedelta(days=MYSKILLBOX_LICENSES[isbn]['duration'])
|