import time import uuid import structlog from ipware import get_client_ip from structlog.threadlocal import bind_threadlocal, clear_threadlocal from ua_parser import user_agent_parser from vbv_lernwelt.core.models import SecurityRequestResponseLog from vbv_lernwelt.importer.utils import try_parse_int logger = structlog.get_logger(__name__) class GetIpBehindReverseProxyMiddleWare: def __init__(self, get_response): self.get_response = get_response def __call__(self, request): client_ip, _is_routable = get_client_ip(request) request.META["REMOTE_ADDR"] = client_ip response = self.get_response(request) return response class SecurityRequestResponseLoggingMiddleware: def __init__(self, get_response): self.get_response = get_response def create_logging_threadlocalbind(self, request): request_username = request.user.username if hasattr(request, "user") else "" request_trace_id = uuid.uuid4().hex bind_threadlocal( request_method=request.method, request_full_path=request.get_full_path(), request_username=request_username, request_client_ip=request.META.get("REMOTE_ADDR"), request_trace_id=request_trace_id, ) return request_trace_id def create_database_security_request_response_log( self, request, response, elapsed_time, request_trace_id="" ): try: entry = SecurityRequestResponseLog() entry.label = getattr(request, "security_request_logging", "") entry.type = getattr(request, "type", "") entry.request_method = request.method entry.request_full_path = request.get_full_path()[0:250] entry.request_username = ( request.user.username if hasattr(request, "user") else "" ) entry.request_client_ip = request.META.get("REMOTE_ADDR") entry.response_status_code = response.status_code entry.request_trace_id = request_trace_id entry.request_elapse_time = elapsed_time entry.save() # save predefined fields additional_json_data = getattr(request, "log_additional_json_data", {}) entry.category = additional_json_data.pop("category", "")[0:255].strip() entry.action = additional_json_data.pop("action", "")[0:255].strip() entry.name = additional_json_data.pop("name", "")[0:255].strip() entry.ref = additional_json_data.pop("ref", "")[0:255].strip() entry.local_url = additional_json_data.pop("local_url", "")[0:255].strip() entry.session_key = additional_json_data.pop("session_key", "")[ 0:255 ].strip() _, value = try_parse_int(additional_json_data.pop("value", 0), 0) entry.value = value entry.save() user_agent = request.headers.get("User-Agent", "") entry.user_agent = user_agent[0:4096].strip() try: ua_parsed = user_agent_parser.Parse(user_agent) entry.user_agent_parsed = user_agent_parser.Parse(user_agent) entry.user_agent_os = ua_parsed.get("os").get("family") entry.user_agent_browser = ua_parsed.get("user_agent").get("family") # pylint: disable=broad-except except Exception as e: logger.warning( "error while parsing user agent", label="analytics", user_agent=user_agent, error=str(e), ) entry.additional_json_data = additional_json_data entry.save() # pylint: disable=broad-except except Exception: logger.warn("could not create db entry", label="security", exc_info=True) def log_request_response(self, request): clear_threadlocal() request_trace_id = self.create_logging_threadlocalbind(request) request.request_trace_id = request_trace_id logger.info( "url access initialized", label="security", ) start_time = time.time() response = self.get_response(request) elapsed_time = round(time.time() - start_time, 3) try: security_request_logging = getattr( request, "security_request_logging", None ) if security_request_logging: self.create_database_security_request_response_log( request, response, elapsed_time=elapsed_time, request_trace_id=request_trace_id, ) # pylint: disable=broad-except except Exception: logger.warn("could not create db entry", label="security", exc_info=True) logger.info( "url access finished", label="security", response_status_code=response.status_code, request_ratelimited=getattr(request, "limited", False), request_finished=True, elapsed_time=elapsed_time, ) clear_threadlocal() return response def __call__(self, request): return self.log_request_response(request)