skillbox/server/users/mutations_public.py

77 lines
2.3 KiB
Python

import graphene
from django.conf import settings
from django.contrib.auth import authenticate, login
from graphene import relay
from core.hep_client import HepClient, HepClientUnauthorizedException, HepClientException
from users.user_signup_login_handler import handle_user_and_verify_products, UNKNOWN_ERROR, EMAIL_NOT_VERIFIED
class BetaLogin(relay.ClientIDMutation):
class Input:
username_input = graphene.String()
password_input = graphene.String()
success = graphene.Boolean()
message = graphene.String()
@classmethod
def mutate_and_get_payload(cls, root, info, **kwargs):
if settings.ALLOW_BETA_LOGIN:
password = kwargs.get('password_input')
username = kwargs.get('username_input')
user = authenticate(username=username, password=password)
if user is None:
raise Exception('invalid_credentials')
login(info.context, user)
return cls(success=True, message='')
raise Exception('not_implemented')
class Login(relay.ClientIDMutation):
class Input:
token_input = graphene.String()
success = graphene.Boolean()
message = graphene.String()
@classmethod
def mutate_and_get_payload(cls, root, info, **kwargs):
hep_client = HepClient()
token = kwargs.get('token_input')
try:
user_data = hep_client.customer_me(token)
except HepClientUnauthorizedException:
return cls.return_login_message('invalid_credentials')
except HepClientException:
return cls.return_login_message(UNKNOWN_ERROR)
user, status_msg = handle_user_and_verify_products(user_data)
user.sync_with_hep_data(user_data)
if user and status_msg != EMAIL_NOT_VERIFIED:
login(info.context, user)
if status_msg:
return cls.return_login_message(status_msg)
return cls(success=True, message='success')
@classmethod
def return_login_message(cls, message):
if message == EMAIL_NOT_VERIFIED or message == UNKNOWN_ERROR or message == 'invalid_credentials':
raise Exception(message)
return cls(success=True, message=message)
class UserMutations:
login = Login.Field()
beta_login = BetaLogin.Field()