288 lines
11 KiB
Python
288 lines
11 KiB
Python
import time
|
|
from datetime import timedelta
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import requests
|
|
from authlib.integrations.django_client import DjangoOAuth2App
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from django.contrib.sessions.middleware import SessionMiddleware
|
|
from django.test import TestCase, RequestFactory
|
|
from django.utils import timezone
|
|
|
|
from core.factories import UserFactory
|
|
from oauth.hep_client import HepClient
|
|
from oauth.user_signup_login_handler import (
|
|
EMAIL_NOT_VERIFIED,
|
|
NO_VALID_LICENSE,
|
|
UNKNOWN_ERROR,
|
|
)
|
|
from oauth.views import authorize, OAUTH_REDIRECT
|
|
from users.tests.mock_hep_data_factory import (
|
|
MockResponse,
|
|
ME_DATA,
|
|
VALID_STUDENT_ORDERS,
|
|
VALID_TEACHERS_ORDERS,
|
|
)
|
|
from users.factories import LicenseFactory
|
|
from users.models import Role, User, SchoolClass, License, UserData
|
|
|
|
IN_A_HOUR = timezone.now() + timedelta(hours=1)
|
|
IN_A_HOUR_UNIX = time.mktime(IN_A_HOUR.timetuple())
|
|
|
|
TOKEN = {
|
|
"token_type": "hep",
|
|
"access_token": "123456",
|
|
"refresh_token": "abcdqwer",
|
|
"expires_at": IN_A_HOUR_UNIX,
|
|
}
|
|
|
|
NEW_ME_DATA = ME_DATA.copy()
|
|
NEW_ME_DATA["email"] = "stiller@has.ch"
|
|
NEW_ME_DATA["id"] = 99
|
|
|
|
|
|
class LoginTests(TestCase):
|
|
def setUp(self):
|
|
self.user = UserFactory(username=ME_DATA["id"], email=ME_DATA["id"])
|
|
Role.objects.create_default_roles()
|
|
self.teacher_role = Role.objects.get_default_teacher_role()
|
|
|
|
self.factory = RequestFactory()
|
|
|
|
def _login(self, url):
|
|
get_response = MagicMock()
|
|
request = self.factory.get(url)
|
|
middleware = SessionMiddleware(get_response)
|
|
middleware.process_request(request)
|
|
request.session.save()
|
|
request.user = AnonymousUser()
|
|
|
|
return authorize(request)
|
|
|
|
@patch.object(DjangoOAuth2App, "authorize_access_token", return_value=TOKEN)
|
|
@patch.object(HepClient, "user_details", return_value=ME_DATA)
|
|
def test_user_data_is_synced_on_login(self, user_mock, authorize_mock):
|
|
old_mail = "aschi@iterativ.ch"
|
|
|
|
self.user.hep_id = ME_DATA["id"]
|
|
self.user.email = old_mail
|
|
self.user.username = old_mail
|
|
self.user.save()
|
|
|
|
now = timezone.now()
|
|
expiry_date = now + timedelta(365)
|
|
LicenseFactory(
|
|
expire_date=expiry_date, licensee=self.user, for_role=self.teacher_role
|
|
).save()
|
|
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
user = User.objects.get(hep_id=self.user.hep_id)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(response.url, f"/{OAUTH_REDIRECT}?state=success")
|
|
self.assertEqual(user.username, ME_DATA["email"])
|
|
self.assertEqual(user.email, ME_DATA["email"])
|
|
self.assertTrue(self.user.is_authenticated)
|
|
|
|
@patch.object(HepClient, "fetch_eorders", return_value=VALID_TEACHERS_ORDERS)
|
|
@patch.object(DjangoOAuth2App, "authorize_access_token", return_value=TOKEN)
|
|
@patch.object(HepClient, "user_details", return_value=ME_DATA)
|
|
def test_teacher_can_login_with_valid_license(
|
|
self, user_mock, authorize_mock, orders_mock
|
|
):
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
|
|
user = User.objects.get(email=ME_DATA["email"])
|
|
|
|
user_role_key = user.user_roles.get(user=user).role.key
|
|
self.assertEqual(user_role_key, Role.objects.TEACHER_KEY)
|
|
|
|
license = License.objects.get(licensee=user)
|
|
self.assertEqual(license.for_role.key, Role.objects.TEACHER_KEY)
|
|
|
|
school_class = SchoolClass.objects.get(users__in=[user])
|
|
self.assertIsNotNone(school_class)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(response.url, f"/{OAUTH_REDIRECT}?state=success")
|
|
self.assertTrue(self.user.is_authenticated)
|
|
|
|
try:
|
|
UserData.objects.get(user=user)
|
|
self.fail(
|
|
"LoginTests.test_teacher_can_login_with_valid_license: Userdata should not exist"
|
|
)
|
|
except:
|
|
pass
|
|
|
|
@patch.object(HepClient, "fetch_eorders", return_value=VALID_STUDENT_ORDERS)
|
|
@patch.object(DjangoOAuth2App, "authorize_access_token", return_value=TOKEN)
|
|
@patch.object(HepClient, "user_details", return_value=ME_DATA)
|
|
def test_student_can_login_with_valid_license(
|
|
self, user_mock, authorize_mock, orders_mock
|
|
):
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
user = User.objects.get(email=ME_DATA["email"])
|
|
|
|
user_role_key = user.user_roles.get(user=user).role.key
|
|
self.assertEqual(user_role_key, Role.objects.STUDENT_KEY)
|
|
|
|
license = License.objects.get(licensee=user)
|
|
self.assertEqual(license.for_role.key, Role.objects.STUDENT_KEY)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(response.url, f"/{OAUTH_REDIRECT}?state=success")
|
|
self.assertTrue(self.user.is_authenticated)
|
|
|
|
@patch.object(HepClient, "is_email_verified", return_value=False)
|
|
@patch.object(DjangoOAuth2App, "authorize_access_token", return_value=TOKEN)
|
|
@patch.object(HepClient, "user_details", return_value=ME_DATA)
|
|
def test_user_with_unconfirmed_email_cannot_login(
|
|
self, user_mock, authorize_mock, verified_mock
|
|
):
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
|
|
User.objects.get(email=ME_DATA["email"])
|
|
self.assertEqual(302, response.status_code)
|
|
self.assertEqual(f"/{OAUTH_REDIRECT}?state={EMAIL_NOT_VERIFIED}", response.url)
|
|
|
|
@patch.object(HepClient, "fetch_eorders", return_value=[])
|
|
@patch.object(DjangoOAuth2App, "authorize_access_token", return_value=TOKEN)
|
|
@patch.object(HepClient, "user_details", return_value=ME_DATA)
|
|
def test_user_can_login_without_license(self, me_mock, product_mock, verified_mock):
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(response.url, f"/{OAUTH_REDIRECT}?state={NO_VALID_LICENSE}")
|
|
self.assertTrue(self.user.is_authenticated)
|
|
|
|
@patch.object(HepClient, "fetch_eorders", return_value=[])
|
|
@patch.object(DjangoOAuth2App, "authorize_access_token", return_value=TOKEN)
|
|
@patch.object(HepClient, "user_details", return_value=ME_DATA)
|
|
def test_user_can_login_local_license_invalid(
|
|
self, me_mock, product_mock, verified_mock
|
|
):
|
|
now = timezone.now()
|
|
expiry_date = now - timedelta(1)
|
|
LicenseFactory(
|
|
expire_date=expiry_date, licensee=self.user, for_role=self.teacher_role
|
|
).save()
|
|
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(response.url, f"/{OAUTH_REDIRECT}?state={NO_VALID_LICENSE}")
|
|
self.assertTrue(self.user.is_authenticated)
|
|
|
|
@patch.object(requests, "get", return_value=MockResponse(500))
|
|
def test_user_gets_notified_if_server_error(self, post_mock):
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(response.url, f"/{OAUTH_REDIRECT}?state={UNKNOWN_ERROR}")
|
|
|
|
@patch.object(HepClient, "fetch_eorders", return_value=VALID_TEACHERS_ORDERS)
|
|
@patch.object(DjangoOAuth2App, "authorize_access_token", return_value=TOKEN)
|
|
@patch.object(HepClient, "user_details", return_value=NEW_ME_DATA)
|
|
def new_user_is_created_in_system_after_login_with_valid_license(
|
|
self, user_mock, authorize_mock, orders_mock
|
|
):
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
|
|
try:
|
|
user = User.objects.get(email=NEW_ME_DATA["email"])
|
|
except:
|
|
self.fail(
|
|
"LoginTests.new_user_is_created_in_system_after_login: User was not created"
|
|
)
|
|
|
|
user_role_key = user.user_roles.get(user=user).role.key
|
|
self.assertEqual(user_role_key, Role.objects.TEACHER_KEY)
|
|
|
|
license = License.objects.get(licensee=user)
|
|
self.assertEqual(license.for_role.key, Role.objects.TEACHER_KEY)
|
|
|
|
school_class = SchoolClass.objects.get(users__in=[user])
|
|
self.assertIsNotNone(school_class)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(response.url, f"/{OAUTH_REDIRECT}?state=success")
|
|
self.assertTrue(self.user.is_authenticated)
|
|
|
|
try:
|
|
UserData.objects.get(user=user)
|
|
self.fail(
|
|
"LoginTests.test_teacher_can_login_with_valid_license: Userdata should not exist"
|
|
)
|
|
except:
|
|
pass
|
|
|
|
@patch.object(HepClient, "fetch_eorders", return_value=[])
|
|
@patch.object(DjangoOAuth2App, "authorize_access_token", return_value=TOKEN)
|
|
@patch.object(HepClient, "user_details", return_value=NEW_ME_DATA)
|
|
def new_user_is_created_in_system_after_login(
|
|
self, user_mock, authorize_mock, orders_mock
|
|
):
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
|
|
try:
|
|
user = User.objects.get(email=NEW_ME_DATA["email"])
|
|
except:
|
|
self.fail(
|
|
"LoginTests.new_user_is_created_in_system_after_login: User was not created"
|
|
)
|
|
|
|
user_role_key = user.user_roles.get(user=user).role.key
|
|
self.assertEqual(user_role_key, Role.objects.TEACHER_KEY)
|
|
|
|
license = License.objects.get(licensee=user)
|
|
self.assertEqual(license.for_role.key, Role.objects.TEACHER_KEY)
|
|
|
|
school_class = SchoolClass.objects.get(users__in=[user])
|
|
self.assertIsNotNone(school_class)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(response.url, f"/{OAUTH_REDIRECT}?state={NO_VALID_LICENSE}")
|
|
self.assertTrue(self.user.is_authenticated)
|
|
|
|
try:
|
|
UserData.objects.get(user=user)
|
|
self.fail(
|
|
"LoginTests.test_teacher_can_login_with_valid_license: Userdata should not exist"
|
|
)
|
|
except:
|
|
pass
|
|
|
|
@patch.object(HepClient, "fetch_eorders", return_value=VALID_TEACHERS_ORDERS)
|
|
@patch.object(DjangoOAuth2App, "authorize_access_token", return_value=TOKEN)
|
|
@patch.object(HepClient, "user_details", return_value=ME_DATA)
|
|
def test_update_of_duration_in_code_MS768(
|
|
self, user_mock, authorize_mock, orders_mock
|
|
):
|
|
"""
|
|
https://iterativ.atlassian.net/browse/MS-768
|
|
"""
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
|
|
user = User.objects.get(email=ME_DATA["email"])
|
|
|
|
# user_role_key = user.user_roles.get(user=user).role.key
|
|
|
|
# Create a license that expires in 70 days later than duration...
|
|
# (which is special since the expiration date is set to now+ duration)
|
|
# This simulates that the duration was changed in the code after the license was created
|
|
|
|
current_license: License = (
|
|
License.objects.filter(licensee=user).order_by("-expire_date").first()
|
|
)
|
|
current_license.expire_date = current_license.expire_date + timedelta(days=700)
|
|
current_license.save()
|
|
for _ in range(5):
|
|
response = self._login("/api/oauth/authorize?code=1234")
|
|
|
|
# Check that only one new licesnse was created and not one for each login
|
|
self.assertEqual(License.objects.all().count(), 2)
|
|
|
|
self.assertEqual(response.url, f"/{OAUTH_REDIRECT}?state=success")
|
|
self.assertTrue(self.user.is_authenticated)
|