239 lines
8.0 KiB
Python
239 lines
8.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# ITerativ GmbH
|
|
# http://www.iterativ.ch/
|
|
#
|
|
# Copyright (c) 2020 ITerativ GmbH. All rights reserved.
|
|
#
|
|
# Created on 23.01.20
|
|
# @author: chrigu <christian.cueni@iterativ.ch>
|
|
from datetime import datetime, timedelta
|
|
|
|
from django.conf import settings
|
|
import logging
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MYSKILLBOX_TEACHER_EDITION_ISBN = "000-4-5678-9012-3"
|
|
MYSKILLBOX_STUDENT_EDITION_ISBN = "123-4-5678-9012-3"
|
|
|
|
TEACHER_EDITION_DURATION = 365
|
|
STUDENT_EDITION_DURATION = 4*365
|
|
|
|
TEACHER_KEY = 'teacher'
|
|
STUDENT_KEY = 'student'
|
|
|
|
|
|
class HepClientException(Exception):
|
|
pass
|
|
|
|
|
|
class HepClientUnauthorizedException(Exception):
|
|
pass
|
|
|
|
|
|
class HepClient:
|
|
URL = settings.HEP_URL
|
|
WEBSITE_ID = 1
|
|
HEADERS = {
|
|
'accept': 'application/json',
|
|
'content-type': 'application/json'
|
|
}
|
|
|
|
def _call(self, url, method='get', data=None, additional_headers=None):
|
|
|
|
request_url = '{}{}'.format(self.URL, url)
|
|
|
|
if additional_headers:
|
|
headers = {**additional_headers, **self.HEADERS}
|
|
else:
|
|
headers = self.HEADERS
|
|
|
|
if method == 'post':
|
|
response = requests.post(request_url, json=data, headers=headers)
|
|
elif method == 'get':
|
|
if data:
|
|
response = requests.get(request_url, headers=headers, data=data)
|
|
else:
|
|
response = requests.get(request_url, headers=headers)
|
|
elif method == 'put':
|
|
response = requests.put(request_url, data=data)
|
|
|
|
# Todo handle 401 and most important network errors
|
|
if response.status_code == 401:
|
|
raise HepClientUnauthorizedException(response.status_code, response.json())
|
|
elif response.status_code != 200:
|
|
raise HepClientException(response.status_code, response.json())
|
|
|
|
logger.info(response.json())
|
|
return response
|
|
|
|
def fetch_admin_token(self, admin_user, password):
|
|
response = self._call('/rest/deutsch/V1/integration/admin/token', 'post',
|
|
data={'username': admin_user, 'password': password})
|
|
return response.json()['token']
|
|
|
|
def is_email_available(self, email):
|
|
response = self._call('/rest/deutsch/V1/customers/isEmailAvailable', method='post',
|
|
data={'customerEmail': email, 'websiteId': self.WEBSITE_ID})
|
|
return response.json()
|
|
|
|
def is_email_verified(self, user_data):
|
|
return 'confirmation' not in user_data
|
|
|
|
def customer_verify_email(self, confirmation_key):
|
|
response = self._call('/rest/V1/customers/me', method='put', data={'confirmationKey': confirmation_key})
|
|
return response.json()
|
|
|
|
def customer_create(self, customer_data, address):
|
|
|
|
if customer_data['prefix'] == 'Herr':
|
|
customer_data['gender'] = 1
|
|
else:
|
|
customer_data['gender'] = 2
|
|
|
|
address['country_id'] = 'CH'
|
|
address['default_billing'] = True
|
|
address['default_shipping'] = True
|
|
|
|
customer_data['addresses'] = [address]
|
|
|
|
response = self._call('/rest/deutsch/V1/customers', method='post', data=customer_data)
|
|
return response.json()
|
|
|
|
def customer_token(self, username, password):
|
|
response = self._call('/rest/deutsch/V1/integration/customer/token', 'post',
|
|
data={'username': username, 'password': password})
|
|
return response.json()
|
|
|
|
def customer_me(self, token):
|
|
response = self._call('/rest/V1/customers/me', additional_headers={'authorization': 'Bearer {}'.format(token)})
|
|
return response.json()
|
|
|
|
def customer_activate(self, confirmation_key):
|
|
response = self._call('/rest/V1/customers/me/activate', method='post', data={
|
|
'confirmationKey': confirmation_key
|
|
})
|
|
return response.json()
|
|
|
|
def customers_search(self, admin_token, email):
|
|
response = self._call("/rest/V1/customers/search?searchCriteria[filterGroups][0][filters][0][field]"
|
|
"=email&searchCriteria[filterGroups][0][filters][0][value]={}".format(email), method='get',
|
|
additional_headers={'authorization': 'Bearer {}'.format(admin_token)})
|
|
|
|
json_data = response.json()
|
|
if len(json_data['items']) > 0:
|
|
return json_data['items'][0]
|
|
return None
|
|
|
|
def _customer_orders(self, admin_token, customer_id):
|
|
url = ("/rest/V1/orders/?searchCriteria[filterGroups][0][filters][0]["
|
|
"field]=customer_id&searchCriteria[filterGroups][0][filters][0][value]={}".format(customer_id))
|
|
|
|
response = self._call(url, additional_headers={'authorization': 'Bearer {}'.format(admin_token)})
|
|
return response.json()
|
|
|
|
def coupon_redeem(self, coupon, customer_id):
|
|
try:
|
|
response = self._call('/rest/deutsch/V1/coupon/{}/customer/{}'.format(coupon, customer_id), method='put')
|
|
except HepClientException as e:
|
|
if e.args[0] == 201:
|
|
return None
|
|
|
|
return response
|
|
|
|
def myskillbox_product_for_customer(self, admin_token, customer_id):
|
|
orders = self._customer_orders(admin_token, customer_id)
|
|
products = self._extract_myskillbox_products(orders)
|
|
|
|
if len(products) == 0:
|
|
return None
|
|
else:
|
|
return self._get_relevant_product(products)
|
|
|
|
def _extract_myskillbox_products(self, orders):
|
|
products = []
|
|
|
|
for order_item in orders['items']:
|
|
|
|
status = ''
|
|
if 'status' in order_item:
|
|
status = order_item['status']
|
|
|
|
for item in order_item['items']:
|
|
if item['sku'] == MYSKILLBOX_TEACHER_EDITION_ISBN or item['sku'] == MYSKILLBOX_STUDENT_EDITION_ISBN:
|
|
|
|
product = {
|
|
'raw': item,
|
|
'activated': self._get_item_activation(order_item),
|
|
'status': status
|
|
}
|
|
|
|
if item['sku'] == MYSKILLBOX_TEACHER_EDITION_ISBN:
|
|
product['edition'] = TEACHER_KEY
|
|
|
|
else:
|
|
product['edition'] = STUDENT_KEY
|
|
|
|
products.append(product)
|
|
|
|
return products
|
|
|
|
def _get_item_activation(self, item):
|
|
if 'created_at' in item:
|
|
return datetime.strptime(item['created_at'], '%Y-%m-%d %H:%M:%S')
|
|
|
|
def _get_relevant_product(self, products):
|
|
|
|
def filter_valid_products(product):
|
|
|
|
if product['status'] != 'complete':
|
|
return False
|
|
|
|
if product['edition'] == TEACHER_KEY:
|
|
expiry_delta = product['activated'] + timedelta(TEACHER_EDITION_DURATION)
|
|
else:
|
|
expiry_delta = product['activated'] + timedelta(STUDENT_EDITION_DURATION)
|
|
|
|
if HepClient.is_product_active(expiry_delta, product['edition']):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
active_products = list(filter(filter_valid_products, products))
|
|
|
|
if len(active_products) == 0:
|
|
return None
|
|
elif len(active_products) == 1:
|
|
return active_products[0]
|
|
else:
|
|
return self._select_from_teacher_products(active_products)
|
|
|
|
def _select_from_teacher_products(self, active_products):
|
|
teacher_edition = None
|
|
|
|
# select first teacher product, as they are all valid it does not matter which one
|
|
for product in active_products:
|
|
if product['edition'] == TEACHER_KEY:
|
|
teacher_edition = product
|
|
break
|
|
|
|
# select a student product, as they are all valid it does not matter which one
|
|
if not teacher_edition:
|
|
return active_products[0]
|
|
|
|
return teacher_edition
|
|
|
|
@staticmethod
|
|
def is_product_active(expiry_date, edition):
|
|
|
|
if edition == TEACHER_KEY:
|
|
duration = TEACHER_EDITION_DURATION
|
|
else:
|
|
duration = STUDENT_EDITION_DURATION
|
|
|
|
now = datetime.now()
|
|
|
|
return expiry_date >= now >= expiry_date - timedelta(days=duration)
|