Run black code formatter

This commit is contained in:
Daniel Egger 2022-02-03 16:53:33 +01:00
parent 8e9abdd7fb
commit 92f88f2d3d
7 changed files with 180 additions and 58 deletions

View File

@ -23,7 +23,7 @@ def _update_or_create_site_with_sequence(site_model, connection, domain, name):
# site is created.
# To avoid this, we need to manually update DB sequence and make sure it's
# greater than the maximum value.
max_id = site_model.objects.order_by('-id').first().id
max_id = site_model.objects.order_by("-id").first().id
with connection.cursor() as cursor:
cursor.execute("SELECT last_value from django_site_id_seq")
(current_id,) = cursor.fetchone()

View File

@ -2,8 +2,8 @@ from django.apps import AppConfig
class CoreConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'vbv_lernwelt.core'
default_auto_field = "django.db.models.BigAutoField"
name = "vbv_lernwelt.core"
def ready(self):
try:

View File

@ -10,7 +10,7 @@ logger = structlog.get_logger(__name__)
class AuthenticationRequiredMiddleware(MiddlewareMixin):
def process_view(self, request, callback, callback_args, callback_kwargs):
if getattr(callback, 'authentication_exempt', False):
if getattr(callback, "authentication_exempt", False):
return None
if not request.user.is_authenticated:
@ -30,7 +30,9 @@ def django_view_authentication_exempt(view_func):
class DjangoViewAuthenticationExemptDRFViewMixin:
@classmethod
def as_view(cls, **initkwargs):
view = super(DjangoViewAuthenticationExemptDRFViewMixin, cls).as_view(**initkwargs)
view = super(DjangoViewAuthenticationExemptDRFViewMixin, cls).as_view(
**initkwargs
)
view.authentication_exempt = True
return view
@ -38,6 +40,8 @@ class DjangoViewAuthenticationExemptDRFViewMixin:
class DjangoViewAuthenticationExemptDRFViewSetMixin:
@classmethod
def as_view(cls, actions=None, **initkwargs):
view = super(DjangoViewAuthenticationExemptDRFViewSetMixin, cls).as_view(actions=actions, **initkwargs)
view = super(DjangoViewAuthenticationExemptDRFViewSetMixin, cls).as_view(
actions=actions, **initkwargs
)
view.authentication_exempt = True
return view

View File

@ -13,55 +13,59 @@ class SecurityRequestResponseLoggingMiddleware:
self.get_response = get_response
def create_logging_threadlocalbind(self, request):
request_username = request.user.username if hasattr(request, 'user') else ''
request_username = request.user.username if hasattr(request, "user") else ""
bind_threadlocal(
request_method=request.method,
request_full_path=request.get_full_path(),
request_username=request_username,
request_client_ip=request.META.get('REMOTE_ADDR'),
request_client_ip=request.META.get("REMOTE_ADDR"),
request_trace_id=uuid.uuid4().hex,
)
def create_database_security_request_response_log(self, request, response):
try:
entry = SecurityRequestResponseLog()
entry.label = getattr(request, 'security_request_logging', '')
entry.label = getattr(request, "security_request_logging", "")
entry.request_method = request.method
entry.request_full_path = request.get_full_path()[:255]
entry.request_username = request.user.username if hasattr(request, 'user') else ''
entry.request_client_ip = request.META.get('REMOTE_ADDR')
entry.request_scn = getattr(request, 'scn', '')
entry.request_username = (
request.user.username if hasattr(request, "user") else ""
)
entry.request_client_ip = request.META.get("REMOTE_ADDR")
entry.request_scn = getattr(request, "scn", "")
entry.response_status_code = response.status_code
entry.additional_json_data = getattr(request, 'log_additional_json_data', {})
entry.additional_json_data = getattr(
request, "log_additional_json_data", {}
)
entry.save()
# pylint: disable=broad-except
except Exception:
logger.warn('could not create db entry', label='security', exc_info=True)
logger.warn("could not create db entry", label="security", exc_info=True)
def log_request_response(self, request):
clear_threadlocal()
self.create_logging_threadlocalbind(request)
logger.info(
'url access initialized',
label='security',
"url access initialized",
label="security",
)
response = self.get_response(request)
security_request_logging = getattr(request, 'security_request_logging', None)
security_request_logging = getattr(request, "security_request_logging", None)
if security_request_logging:
self.create_database_security_request_response_log(request, response)
logger.info(
'url access finished',
label='security',
"url access finished",
label="security",
response_status_code=response.status_code,
request_ratelimited=getattr(request, 'limited', False),
request_finished=True
request_ratelimited=getattr(request, "limited", False),
request_finished=True,
)
clear_threadlocal()

View File

@ -11,47 +11,158 @@ class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
("auth", "0012_alter_user_first_name_max_length"),
]
operations = [
migrations.CreateModel(
name='SecurityRequestResponseLog',
name="SecurityRequestResponseLog",
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('label', models.CharField(blank=True, default='', max_length=255)),
('request_method', models.CharField(blank=True, default='', max_length=255)),
('request_full_path', models.CharField(blank=True, default='', max_length=255)),
('request_username', models.CharField(blank=True, default='', max_length=255)),
('request_client_ip', models.CharField(blank=True, default='', max_length=255)),
('response_status_code', models.CharField(blank=True, default='', max_length=255)),
('additional_json_data', models.JSONField(blank=True, default=dict)),
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("label", models.CharField(blank=True, default="", max_length=255)),
(
"request_method",
models.CharField(blank=True, default="", max_length=255),
),
(
"request_full_path",
models.CharField(blank=True, default="", max_length=255),
),
(
"request_username",
models.CharField(blank=True, default="", max_length=255),
),
(
"request_client_ip",
models.CharField(blank=True, default="", max_length=255),
),
(
"response_status_code",
models.CharField(blank=True, default="", max_length=255),
),
("additional_json_data", models.JSONField(blank=True, default=dict)),
],
),
migrations.CreateModel(
name='User',
name="User",
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.Group', verbose_name='groups')),
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.Permission', verbose_name='user permissions')),
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("password", models.CharField(max_length=128, verbose_name="password")),
(
"last_login",
models.DateTimeField(
blank=True, null=True, verbose_name="last login"
),
),
(
"is_superuser",
models.BooleanField(
default=False,
help_text="Designates that this user has all permissions without explicitly assigning them.",
verbose_name="superuser status",
),
),
(
"username",
models.CharField(
error_messages={
"unique": "A user with that username already exists."
},
help_text="Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.",
max_length=150,
unique=True,
validators=[
django.contrib.auth.validators.UnicodeUsernameValidator()
],
verbose_name="username",
),
),
(
"first_name",
models.CharField(
blank=True, max_length=150, verbose_name="first name"
),
),
(
"last_name",
models.CharField(
blank=True, max_length=150, verbose_name="last name"
),
),
(
"email",
models.EmailField(
blank=True, max_length=254, verbose_name="email address"
),
),
(
"is_staff",
models.BooleanField(
default=False,
help_text="Designates whether the user can log into this admin site.",
verbose_name="staff status",
),
),
(
"is_active",
models.BooleanField(
default=True,
help_text="Designates whether this user should be treated as active. Unselect this instead of deleting accounts.",
verbose_name="active",
),
),
(
"date_joined",
models.DateTimeField(
default=django.utils.timezone.now, verbose_name="date joined"
),
),
(
"groups",
models.ManyToManyField(
blank=True,
help_text="The groups this user belongs to. A user will get all permissions granted to each of their groups.",
related_name="user_set",
related_query_name="user",
to="auth.Group",
verbose_name="groups",
),
),
(
"user_permissions",
models.ManyToManyField(
blank=True,
help_text="Specific permissions for this user.",
related_name="user_set",
related_query_name="user",
to="auth.Permission",
verbose_name="user permissions",
),
),
],
options={
'verbose_name': 'user',
'verbose_name_plural': 'users',
'abstract': False,
"verbose_name": "user",
"verbose_name_plural": "users",
"abstract": False,
},
managers=[
('objects', django.contrib.auth.models.UserManager()),
("objects", django.contrib.auth.models.UserManager()),
],
),
]

View File

@ -4,20 +4,22 @@ from vbv_lernwelt.core.models import User
def create_iterativ_users(apps, schema_editor):
for username in ['info@iterativ.ch', ]:
for username in [
"info@iterativ.ch",
]:
user = User.objects.create(
username=username,
email=username,
is_superuser=True,
is_staff=True,
)
user.set_password('ACEEs0DCmNaPxdoNV8vhccuCTRl9b')
user.set_password("ACEEs0DCmNaPxdoNV8vhccuCTRl9b")
user.save()
class Migration(migrations.Migration):
dependencies = [
('core', '0001_initial'),
("core", "0001_initial"),
]
operations = [

View File

@ -8,17 +8,18 @@ class User(AbstractUser):
Default custom user model for VBV Lernwelt.
If adding fields that need to be filled at user signup,
"""
pass
class SecurityRequestResponseLog(models.Model):
label = models.CharField(max_length=255, blank=True, default='')
label = models.CharField(max_length=255, blank=True, default="")
request_method = models.CharField(max_length=255, blank=True, default='')
request_full_path = models.CharField(max_length=255, blank=True, default='')
request_username = models.CharField(max_length=255, blank=True, default='')
request_client_ip = models.CharField(max_length=255, blank=True, default='')
request_method = models.CharField(max_length=255, blank=True, default="")
request_full_path = models.CharField(max_length=255, blank=True, default="")
request_username = models.CharField(max_length=255, blank=True, default="")
request_client_ip = models.CharField(max_length=255, blank=True, default="")
response_status_code = models.CharField(max_length=255, blank=True, default='')
response_status_code = models.CharField(max_length=255, blank=True, default="")
additional_json_data = JSONField(default=dict, blank=True)