This commit is contained in:
120
backend/core/middleware.py
Normal file
120
backend/core/middleware.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""
|
||||
AuditLogMiddleware
|
||||
──────────────────
|
||||
Перехватывает мутирующие HTTP-запросы (POST / PUT / PATCH / DELETE),
|
||||
пишет запись в таблицу action_logs после того как ответ уже отправлен клиенту.
|
||||
Таким образом, запись в БД не блокирует основной поток ответа.
|
||||
|
||||
Что логируется:
|
||||
• Метод + путь → action ("POST /api/tickets/book")
|
||||
• IP-адрес → ip_address (X-Forwarded-For → client.host)
|
||||
• user_id из JWT-токена → user_id (None для анонимных запросов)
|
||||
• HTTP-статус ответа → details["status_code"]
|
||||
|
||||
Что пропускается:
|
||||
• GET, HEAD, OPTIONS
|
||||
• /docs, /redoc, /openapi.json, /metrics, /api/health
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import jwt
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import Response
|
||||
|
||||
from core.security import SECRET_KEY, ALGORITHM
|
||||
from database.models import ActionLog
|
||||
from database.session import async_session
|
||||
|
||||
log = logging.getLogger("audit")
|
||||
|
||||
# ─── Paths that are never interesting to audit ────────────────────────────────
|
||||
|
||||
_SKIP_PREFIXES: tuple[str, ...] = (
|
||||
"/docs",
|
||||
"/redoc",
|
||||
"/openapi.json",
|
||||
"/metrics",
|
||||
"/api/health",
|
||||
)
|
||||
|
||||
# Only log requests that can change state
|
||||
_AUDIT_METHODS: frozenset[str] = frozenset({"POST", "PUT", "PATCH", "DELETE"})
|
||||
|
||||
|
||||
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def _extract_user_id(request: Request) -> int | None:
|
||||
"""
|
||||
Try to decode the Bearer JWT and return the subject as int.
|
||||
Returns None if the header is absent, malformed, or expired.
|
||||
"""
|
||||
auth_header = request.headers.get("Authorization", "")
|
||||
if not auth_header.startswith("Bearer "):
|
||||
return None
|
||||
token = auth_header.removeprefix("Bearer ").strip()
|
||||
try:
|
||||
payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
sub = payload.get("sub")
|
||||
return int(sub) if sub is not None else None
|
||||
except (jwt.PyJWTError, ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def _extract_ip(request: Request) -> str | None:
|
||||
"""
|
||||
Prefer X-Forwarded-For (set by Traefik / Nginx reverse proxy).
|
||||
Fall back to the direct TCP peer address.
|
||||
"""
|
||||
forwarded_for = request.headers.get("X-Forwarded-For")
|
||||
if forwarded_for:
|
||||
# X-Forwarded-For can be a comma-separated list; the first entry is the client IP
|
||||
return forwarded_for.split(",")[0].strip()
|
||||
if request.client:
|
||||
return request.client.host
|
||||
return None
|
||||
|
||||
|
||||
# ─── Middleware ───────────────────────────────────────────────────────────────
|
||||
|
||||
class AuditLogMiddleware(BaseHTTPMiddleware):
|
||||
async def dispatch(self, request: Request, call_next: Any) -> Response:
|
||||
# 1. Let the request go through and collect the response first
|
||||
response: Response = await call_next(request)
|
||||
|
||||
# 2. Decide whether to log
|
||||
method = request.method.upper()
|
||||
path = request.url.path
|
||||
|
||||
if method not in _AUDIT_METHODS:
|
||||
return response
|
||||
|
||||
if any(path.startswith(prefix) for prefix in _SKIP_PREFIXES):
|
||||
return response
|
||||
|
||||
# 3. Extract metadata (cheap, no DB involved)
|
||||
user_id = _extract_user_id(request)
|
||||
ip_address = _extract_ip(request)
|
||||
action = f"{method} {path}"
|
||||
details: dict[str, Any] = {"status_code": response.status_code}
|
||||
|
||||
# 4. Write the audit record asynchronously — after the response is ready,
|
||||
# so latency is not affected. Errors here must not propagate to the client.
|
||||
try:
|
||||
async with async_session() as session:
|
||||
session.add(
|
||||
ActionLog(
|
||||
user_id=user_id,
|
||||
action=action,
|
||||
ip_address=ip_address,
|
||||
details=details,
|
||||
)
|
||||
)
|
||||
await session.commit()
|
||||
except Exception:
|
||||
# Audit failure must never break the API
|
||||
log.exception("Failed to write audit log for %s %s", method, path)
|
||||
|
||||
return response
|
||||
@@ -5,7 +5,7 @@ import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
QUEUE_NAME = "pdf_generation_queue"
|
||||
QUEUE_NAME = "ticket_events"
|
||||
|
||||
async def publish_ticket_task(ticket_id: int):
|
||||
"""Отправляет ID билета в RabbitMQ."""
|
||||
|
||||
@@ -2,6 +2,7 @@ import enum
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from sqlalchemy import String, Integer, ForeignKey, DateTime, Enum, Boolean
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
@@ -75,4 +76,30 @@ class Ticket(Base):
|
||||
)
|
||||
|
||||
seat: Mapped["Seat"] = relationship(back_populates="ticket")
|
||||
user: Mapped["User"] = relationship(back_populates="tickets")
|
||||
user: Mapped["User"] = relationship(back_populates="tickets")
|
||||
|
||||
|
||||
class ActionLog(Base):
|
||||
"""Audit trail: every mutating request is recorded here by AuditLogMiddleware."""
|
||||
|
||||
__tablename__ = "action_logs"
|
||||
|
||||
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
||||
|
||||
# nullable — anonymous / unauthenticated requests (e.g. /api/auth/register)
|
||||
user_id: Mapped[int | None] = mapped_column(Integer, nullable=True, index=True)
|
||||
|
||||
# "POST /api/tickets/book", "DELETE /api/seats/42", etc.
|
||||
action: Mapped[str] = mapped_column(String, nullable=False, index=True)
|
||||
|
||||
# request.client.host or X-Forwarded-For (behind Traefik)
|
||||
ip_address: Mapped[str | None] = mapped_column(String, nullable=True)
|
||||
|
||||
# Optional structured payload (response body excerpt, error detail, …)
|
||||
details: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
default=lambda: datetime.now(timezone.utc),
|
||||
index=True,
|
||||
)
|
||||
@@ -9,11 +9,16 @@ from api.routers.auth import router as auth_router
|
||||
from api.routers.webhooks import router as webhooks_router
|
||||
from api.routers.tickets import router as tickets_router
|
||||
from api.routers.tournaments import router as tournaments_router
|
||||
from core.middleware import AuditLogMiddleware
|
||||
|
||||
app = FastAPI(title="Ticketing System API")
|
||||
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
# NOTE: middlewares are applied in reverse registration order (last registered = outermost).
|
||||
# AuditLog is registered first so it wraps everything, including CORS.
|
||||
app.add_middleware(AuditLogMiddleware)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"], # Для локальной песочницы оставляем открытым
|
||||
|
||||
46
backend/migrations/versions/ea17bfd32885_add_action_logs.py
Normal file
46
backend/migrations/versions/ea17bfd32885_add_action_logs.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""add action_logs
|
||||
|
||||
Revision ID: ea17bfd32885
|
||||
Revises: b2e071ae215a
|
||||
Create Date: 2026-03-12 14:17:22.483024
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'ea17bfd32885'
|
||||
down_revision: Union[str, Sequence[str], None] = 'b2e071ae215a'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('action_logs',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('user_id', sa.Integer(), nullable=True),
|
||||
sa.Column('action', sa.String(), nullable=False),
|
||||
sa.Column('ip_address', sa.String(), nullable=True),
|
||||
sa.Column('details', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index(op.f('ix_action_logs_action'), 'action_logs', ['action'], unique=False)
|
||||
op.create_index(op.f('ix_action_logs_created_at'), 'action_logs', ['created_at'], unique=False)
|
||||
op.create_index(op.f('ix_action_logs_user_id'), 'action_logs', ['user_id'], unique=False)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index(op.f('ix_action_logs_user_id'), table_name='action_logs')
|
||||
op.drop_index(op.f('ix_action_logs_created_at'), table_name='action_logs')
|
||||
op.drop_index(op.f('ix_action_logs_action'), table_name='action_logs')
|
||||
op.drop_table('action_logs')
|
||||
# ### end Alembic commands ###
|
||||
@@ -90,18 +90,21 @@ async def _process_message(
|
||||
async with message.process(requeue=True):
|
||||
try:
|
||||
payload: dict[str, Any] = json.loads(message.body)
|
||||
action: str = payload.get("action", "")
|
||||
|
||||
if action == "ticket_paid":
|
||||
|
||||
# Если есть ticket_id — сразу в работу. Никаких проверок action.
|
||||
if "ticket_id" in payload:
|
||||
async with session_factory() as session:
|
||||
await _handle_ticket_paid(payload, session)
|
||||
else:
|
||||
log.debug("Unknown action '%s', skipping", action)
|
||||
log.warning("Кривой payload, нет ticket_id: %s", payload)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
log.exception("Failed to decode message body: %r", message.body)
|
||||
# Некорректный JSON — не возвращаем в очередь (requeue=False через reject)
|
||||
await message.reject(requeue=False)
|
||||
except Exception as e:
|
||||
log.exception("Ошибка при обработке сообщения: %s", e)
|
||||
# Исключение заставит aio_pika вернуть задачу в очередь (requeue=True)
|
||||
raise
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
|
||||
Reference in New Issue
Block a user