121 lines
4.6 KiB
Python
121 lines
4.6 KiB
Python
"""
|
|
AuditLogMiddleware
|
|
──────────────────
|
|
Перехватывает мутирующие HTTP-запросы (POST / PUT / PATCH / DELETE),
|
|
пишет запись в таблицу action_logs после того как ответ уже отправлен клиенту.
|
|
Таким образом, запись в БД не блокирует основной поток ответа.
|
|
|
|
Что логируется:
|
|
• Метод + путь → action ("POST /api/tickets/book")
|
|
• IP-адрес → ip_address (X-Forwarded-For → client.host)
|
|
• user_id из JWT-токена → user_id (None для анонимных запросов)
|
|
• HTTP-статус ответа → details["status_code"]
|
|
|
|
Что пропускается:
|
|
• GET, HEAD, OPTIONS
|
|
• /docs, /redoc, /openapi.json, /metrics, /api/health
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
import jwt
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.requests import Request
|
|
from starlette.responses import Response
|
|
|
|
from core.security import SECRET_KEY, ALGORITHM
|
|
from database.models import ActionLog
|
|
from database.session import async_session
|
|
|
|
log = logging.getLogger("audit")
|
|
|
|
# ─── Paths that are never interesting to audit ────────────────────────────────
|
|
|
|
_SKIP_PREFIXES: tuple[str, ...] = (
|
|
"/docs",
|
|
"/redoc",
|
|
"/openapi.json",
|
|
"/metrics",
|
|
"/api/health",
|
|
)
|
|
|
|
# Only log requests that can change state
|
|
_AUDIT_METHODS: frozenset[str] = frozenset({"POST", "PUT", "PATCH", "DELETE"})
|
|
|
|
|
|
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def _extract_user_id(request: Request) -> int | None:
|
|
"""
|
|
Try to decode the Bearer JWT and return the subject as int.
|
|
Returns None if the header is absent, malformed, or expired.
|
|
"""
|
|
auth_header = request.headers.get("Authorization", "")
|
|
if not auth_header.startswith("Bearer "):
|
|
return None
|
|
token = auth_header.removeprefix("Bearer ").strip()
|
|
try:
|
|
payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
sub = payload.get("sub")
|
|
return int(sub) if sub is not None else None
|
|
except (jwt.PyJWTError, ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def _extract_ip(request: Request) -> str | None:
|
|
"""
|
|
Prefer X-Forwarded-For (set by Traefik / Nginx reverse proxy).
|
|
Fall back to the direct TCP peer address.
|
|
"""
|
|
forwarded_for = request.headers.get("X-Forwarded-For")
|
|
if forwarded_for:
|
|
# X-Forwarded-For can be a comma-separated list; the first entry is the client IP
|
|
return forwarded_for.split(",")[0].strip()
|
|
if request.client:
|
|
return request.client.host
|
|
return None
|
|
|
|
|
|
# ─── Middleware ───────────────────────────────────────────────────────────────
|
|
|
|
class AuditLogMiddleware(BaseHTTPMiddleware):
|
|
async def dispatch(self, request: Request, call_next: Any) -> Response:
|
|
# 1. Let the request go through and collect the response first
|
|
response: Response = await call_next(request)
|
|
|
|
# 2. Decide whether to log
|
|
method = request.method.upper()
|
|
path = request.url.path
|
|
|
|
if method not in _AUDIT_METHODS:
|
|
return response
|
|
|
|
if any(path.startswith(prefix) for prefix in _SKIP_PREFIXES):
|
|
return response
|
|
|
|
# 3. Extract metadata (cheap, no DB involved)
|
|
user_id = _extract_user_id(request)
|
|
ip_address = _extract_ip(request)
|
|
action = f"{method} {path}"
|
|
details: dict[str, Any] = {"status_code": response.status_code}
|
|
|
|
# 4. Write the audit record asynchronously — after the response is ready,
|
|
# so latency is not affected. Errors here must not propagate to the client.
|
|
try:
|
|
async with async_session() as session:
|
|
session.add(
|
|
ActionLog(
|
|
user_id=user_id,
|
|
action=action,
|
|
ip_address=ip_address,
|
|
details=details,
|
|
)
|
|
)
|
|
await session.commit()
|
|
except Exception:
|
|
# Audit failure must never break the API
|
|
log.exception("Failed to write audit log for %s %s", method, path)
|
|
|
|
return response
|