Refactor user creation code for sso and import

This commit is contained in:
Daniel Egger 2023-05-31 15:19:04 +02:00
parent 987842861c
commit 61ce0897cf
14 changed files with 160 additions and 36 deletions

View File

@ -1,25 +0,0 @@
from django.contrib.auth.base_user import BaseUserManager
from django.contrib.auth.models import AbstractUser
class UserManager(BaseUserManager):
def create_or_update_by_email(self, user_dict: dict) -> tuple[AbstractUser, bool]:
# create or sync user with OpenID Data
user, created = self.model.objects.get_or_create(
sso_id=user_dict["oid"],
defaults={
"email": user_dict["email"],
"username": user_dict["email"],
"first_name": user_dict["first_name"],
"last_name": user_dict["last_name"],
},
)
if not created:
user.email = user_dict["email"]
user.username = user_dict["email"]
user.first_name = user_dict["first_name"]
user.last_name = user_dict["last_name"]
user.save()
return user, created

View File

@ -27,8 +27,6 @@ class User(AbstractUser):
additional_json_data = JSONField(default=dict, blank=True)
language = models.CharField(max_length=2, choices=LANGUAGE_CHOICES, default="de")
objects = UserManager()
class SecurityRequestResponseLog(models.Model):
label = models.CharField(max_length=255, blank=True, default="")

View File

View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class SsoConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "vbv_lernwelt.importer"

View File

View File

@ -0,0 +1,30 @@
from vbv_lernwelt.core.models import User
def create_or_update_user(
email: str, first_name: str = "", last_name: str = "", sso_id: str = None
):
user = None
if sso_id:
user_qs = User.objects.filter(sso_id=sso_id)
if user_qs.exists():
user = user_qs.first()
if not user:
user_qs = User.objects.filter(email=email)
if user_qs.exists():
user = user_qs.first()
if not user:
# create user
user = User(sso_id=sso_id, email=email, username=email)
user.email = email
user.sso_id = sso_id
user.username = email
user.first_name = first_name
user.last_name = last_name
user.set_unusable_password()
user.save()
return user

View File

@ -0,0 +1,96 @@
from django.test import TestCase
from vbv_lernwelt.core.models import User
from vbv_lernwelt.importer.services import create_or_update_user
class CreateOrUpdateUserTestCase(TestCase):
def test_create_user(self):
u = create_or_update_user(
email="daniel@example.com",
first_name="Daniel",
last_name="Egger",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
saved_user = User.objects.get(id=u.id)
self.assertEqual(saved_user.email, "daniel@example.com")
self.assertEqual(saved_user.username, "daniel@example.com")
self.assertEqual(saved_user.first_name, "Daniel")
self.assertEqual(saved_user.last_name, "Egger")
self.assertEqual(str(saved_user.sso_id), "12229620-81ea-483d-8d96-6ba8be5f9eb7")
def test_update_existing_user_with_oid(self):
User.objects.create(
email="daniel@example.com",
username="daniel@example.com",
first_name="Daniel",
last_name="Egger",
)
create_or_update_user(
email="daniel@example.com",
first_name="Daniel",
last_name="Egger",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
self.assertEqual(1, User.objects.count())
user = User.objects.first()
self.assertEqual(user.email, "daniel@example.com")
self.assertEqual(user.username, "daniel@example.com")
self.assertEqual(user.first_name, "Daniel")
self.assertEqual(user.last_name, "Egger")
self.assertEqual(str(user.sso_id), "12229620-81ea-483d-8d96-6ba8be5f9eb7")
def test_update_existing_user_with_new_last_name(self):
User.objects.create(
email="daniel@example.com",
username="daniel@example.com",
first_name="Daniel",
last_name="Egger",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
create_or_update_user(
email="daniel@example.com",
first_name="Daniel",
last_name="Marro",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
self.assertEqual(1, User.objects.count())
user = User.objects.first()
self.assertEqual(user.email, "daniel@example.com")
self.assertEqual(user.username, "daniel@example.com")
self.assertEqual(user.first_name, "Daniel")
self.assertEqual(user.last_name, "Marro")
self.assertEqual(str(user.sso_id), "12229620-81ea-483d-8d96-6ba8be5f9eb7")
def test_update_existing_user_with_new_email(self):
User.objects.create(
email="daniel@example.com",
username="daniel@example.com",
first_name="Daniel",
last_name="Egger",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
create_or_update_user(
email="danu@example.com",
first_name="Daniel",
last_name="Egger",
sso_id="12229620-81ea-483d-8d96-6ba8be5f9eb7",
)
self.assertEqual(1, User.objects.count())
user = User.objects.first()
self.assertEqual(user.email, "danu@example.com")
self.assertEqual(user.username, "danu@example.com")
self.assertEqual(user.first_name, "Daniel")
self.assertEqual(user.last_name, "Egger")
self.assertEqual(str(user.sso_id), "12229620-81ea-483d-8d96-6ba8be5f9eb7")

View File

View File

View File

@ -1,3 +0,0 @@
from django.test import TestCase
# Create your tests here.

View File

@ -0,0 +1,17 @@
{
"ver": "1.0",
"iss": "https://vbvtst.b2clogin.com/6967b19e-ec5c-4a46-bb16-01b0983da41b/v2.0/",
"sub": "f8c8e526-9fb1-4983-a5b7-4c069a83e317",
"aud": "8d32c131-0d60-4588-b01a-ae3435d44c23",
"exp": 1685538794,
"nonce": "mABq9hjYOMF34fCEi3VL",
"iat": 1685535194,
"auth_time": 1685535194,
"oid": "f8c8e526-9fb1-4983-a5b7-4c069a83e317",
"given_name": "Daniel",
"family_name": "Egger",
"name": "unknown",
"emails": ["daniel.egger+vbv-stage@gmail.com"],
"tfp": "B2C_1_SignUpAndSignIn_v3",
"nbf": 1685535194
}

View File

@ -1,10 +1,11 @@
import structlog as structlog
from authlib.integrations.base_client import OAuthError
from django.conf import settings
from django.contrib.auth import get_user_model, login as dj_login
from django.contrib.auth import login as dj_login
from django.shortcuts import redirect
from sentry_sdk import capture_exception
from vbv_lernwelt.importer.services import create_or_update_user
from vbv_lernwelt.sso.client import oauth
from vbv_lernwelt.sso.jwt import decode_jwt
@ -22,19 +23,20 @@ def login(request):
def authorize(request):
try:
logger.debug(request)
logger.debug(request, label="sso")
token = getattr(oauth, settings.OAUTH["client_name"]).authorize_access_token(
request
)
deocded_token = decode_jwt(token["id_token"])
decoded_token = decode_jwt(token["id_token"])
# logger.debug(label="sso", decoded_token=decoded_token)
except OAuthError as e:
logger.error(f"OAuth error: {e}")
logger.error(e, exc_info=True, label="sso")
if not settings.DEBUG:
capture_exception(e)
return redirect(f"/{OAUTH_FAIL_REDIRECT}?state=someerror") # to be defined
user_data = _user_data_from_token_data(deocded_token)
user, created = get_user_model().objects.create_or_update_by_email(user_data)
user_data = _user_data_from_token_data(decoded_token)
user = create_or_update_user(**user_data)
dj_login(request, user)
return redirect(f"/")