vbv/server/vbv_lernwelt/core/middleware/security.py

108 lines
3.5 KiB
Python

import uuid
import structlog
from django.core.exceptions import PermissionDenied
from django.http import Http404
from ipware import get_client_ip
from structlog.threadlocal import bind_threadlocal, clear_threadlocal
from vbv_lernwelt.core.models import SecurityRequestResponseLog
logger = structlog.get_logger(__name__)
class GetIpBehindReverseProxyMiddleWare:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
client_ip, _is_routable = get_client_ip(request)
request.META["REMOTE_ADDR"] = client_ip
response = self.get_response(request)
return response
class SecurityRequestResponseLoggingMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def create_logging_threadlocalbind(self, request):
request_username = request.user.username if hasattr(request, "user") else ""
bind_threadlocal(
request_method=request.method,
request_full_path=request.get_full_path(),
request_username=request_username,
request_client_ip=request.META.get("REMOTE_ADDR"),
request_trace_id=uuid.uuid4().hex,
)
def create_database_security_request_response_log(self, request, response):
try:
entry = SecurityRequestResponseLog()
entry.label = getattr(request, "security_request_logging", "")
entry.request_method = request.method
entry.request_full_path = request.get_full_path()[:255]
entry.request_username = (
request.user.username if hasattr(request, "user") else ""
)
entry.request_client_ip = request.META.get("REMOTE_ADDR")
entry.request_scn = getattr(request, "scn", "")
entry.response_status_code = response.status_code
entry.additional_json_data = getattr(
request, "log_additional_json_data", {}
)
entry.save()
# pylint: disable=broad-except
except Exception:
logger.warn("could not create db entry", label="security", exc_info=True)
def log_request_response(self, request):
clear_threadlocal()
self.create_logging_threadlocalbind(request)
logger.info(
"url access initialized",
label="security",
)
response = self.get_response(request)
security_request_logging = getattr(request, "security_request_logging", None)
if security_request_logging:
self.create_database_security_request_response_log(request, response)
logger.info(
"url access finished",
label="security",
response_status_code=response.status_code,
request_ratelimited=getattr(request, "limited", False),
request_finished=True,
)
clear_threadlocal()
return response
def __call__(self, request):
return self.log_request_response(request)
def process_exception(self, request, exception):
if isinstance(exception, (Http404, PermissionDenied)):
# We don't log an exception here, and we don't set that we handled
# an error as we want the standard `request_finished` log message
# to be emitted.
return
self._raised_exception = True
logger.exception(
"request_failed",
label="security",
response_status_code=500,
)