From 5cc5efd2e08e2d89ee6f5f88bd3f63ac9886bf31 Mon Sep 17 00:00:00 2001 From: openit Date: Thu, 12 Mar 2026 14:20:45 +0000 Subject: [PATCH] add logging --- backend/core/middleware.py | 120 ++++++++++++++++++ backend/core/rabbitmq.py | 2 +- backend/database/models.py | 29 ++++- backend/main.py | 5 + .../versions/ea17bfd32885_add_action_logs.py | 46 +++++++ backend/worker.py | 13 +- 6 files changed, 208 insertions(+), 7 deletions(-) create mode 100644 backend/core/middleware.py create mode 100644 backend/migrations/versions/ea17bfd32885_add_action_logs.py diff --git a/backend/core/middleware.py b/backend/core/middleware.py new file mode 100644 index 0000000..9a9ac5e --- /dev/null +++ b/backend/core/middleware.py @@ -0,0 +1,120 @@ +""" +AuditLogMiddleware +────────────────── +Перехватывает мутирующие HTTP-запросы (POST / PUT / PATCH / DELETE), +пишет запись в таблицу action_logs после того как ответ уже отправлен клиенту. +Таким образом, запись в БД не блокирует основной поток ответа. + +Что логируется: + • Метод + путь → action ("POST /api/tickets/book") + • IP-адрес → ip_address (X-Forwarded-For → client.host) + • user_id из JWT-токена → user_id (None для анонимных запросов) + • HTTP-статус ответа → details["status_code"] + +Что пропускается: + • GET, HEAD, OPTIONS + • /docs, /redoc, /openapi.json, /metrics, /api/health +""" + +import logging +from typing import Any + +import jwt +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import Response + +from core.security import SECRET_KEY, ALGORITHM +from database.models import ActionLog +from database.session import async_session + +log = logging.getLogger("audit") + +# ─── Paths that are never interesting to audit ──────────────────────────────── + +_SKIP_PREFIXES: tuple[str, ...] = ( + "/docs", + "/redoc", + "/openapi.json", + "/metrics", + "/api/health", +) + +# Only log requests that can change state +_AUDIT_METHODS: frozenset[str] = frozenset({"POST", "PUT", "PATCH", "DELETE"}) + + +# ─── Helpers ────────────────────────────────────────────────────────────────── + +def _extract_user_id(request: Request) -> int | None: + """ + Try to decode the Bearer JWT and return the subject as int. + Returns None if the header is absent, malformed, or expired. + """ + auth_header = request.headers.get("Authorization", "") + if not auth_header.startswith("Bearer "): + return None + token = auth_header.removeprefix("Bearer ").strip() + try: + payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + sub = payload.get("sub") + return int(sub) if sub is not None else None + except (jwt.PyJWTError, ValueError, TypeError): + return None + + +def _extract_ip(request: Request) -> str | None: + """ + Prefer X-Forwarded-For (set by Traefik / Nginx reverse proxy). + Fall back to the direct TCP peer address. + """ + forwarded_for = request.headers.get("X-Forwarded-For") + if forwarded_for: + # X-Forwarded-For can be a comma-separated list; the first entry is the client IP + return forwarded_for.split(",")[0].strip() + if request.client: + return request.client.host + return None + + +# ─── Middleware ─────────────────────────────────────────────────────────────── + +class AuditLogMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request: Request, call_next: Any) -> Response: + # 1. Let the request go through and collect the response first + response: Response = await call_next(request) + + # 2. Decide whether to log + method = request.method.upper() + path = request.url.path + + if method not in _AUDIT_METHODS: + return response + + if any(path.startswith(prefix) for prefix in _SKIP_PREFIXES): + return response + + # 3. Extract metadata (cheap, no DB involved) + user_id = _extract_user_id(request) + ip_address = _extract_ip(request) + action = f"{method} {path}" + details: dict[str, Any] = {"status_code": response.status_code} + + # 4. Write the audit record asynchronously — after the response is ready, + # so latency is not affected. Errors here must not propagate to the client. + try: + async with async_session() as session: + session.add( + ActionLog( + user_id=user_id, + action=action, + ip_address=ip_address, + details=details, + ) + ) + await session.commit() + except Exception: + # Audit failure must never break the API + log.exception("Failed to write audit log for %s %s", method, path) + + return response diff --git a/backend/core/rabbitmq.py b/backend/core/rabbitmq.py index ffd2386..016cf9b 100644 --- a/backend/core/rabbitmq.py +++ b/backend/core/rabbitmq.py @@ -5,7 +5,7 @@ import logging logger = logging.getLogger(__name__) -QUEUE_NAME = "pdf_generation_queue" +QUEUE_NAME = "ticket_events" async def publish_ticket_task(ticket_id: int): """Отправляет ID билета в RabbitMQ.""" diff --git a/backend/database/models.py b/backend/database/models.py index 8b27899..0cb2754 100644 --- a/backend/database/models.py +++ b/backend/database/models.py @@ -2,6 +2,7 @@ import enum import uuid from datetime import datetime, timezone from sqlalchemy import String, Integer, ForeignKey, DateTime, Enum, Boolean +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship class Base(DeclarativeBase): @@ -75,4 +76,30 @@ class Ticket(Base): ) seat: Mapped["Seat"] = relationship(back_populates="ticket") - user: Mapped["User"] = relationship(back_populates="tickets") \ No newline at end of file + user: Mapped["User"] = relationship(back_populates="tickets") + + +class ActionLog(Base): + """Audit trail: every mutating request is recorded here by AuditLogMiddleware.""" + + __tablename__ = "action_logs" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + + # nullable — anonymous / unauthenticated requests (e.g. /api/auth/register) + user_id: Mapped[int | None] = mapped_column(Integer, nullable=True, index=True) + + # "POST /api/tickets/book", "DELETE /api/seats/42", etc. + action: Mapped[str] = mapped_column(String, nullable=False, index=True) + + # request.client.host or X-Forwarded-For (behind Traefik) + ip_address: Mapped[str | None] = mapped_column(String, nullable=True) + + # Optional structured payload (response body excerpt, error detail, …) + details: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + index=True, + ) \ No newline at end of file diff --git a/backend/main.py b/backend/main.py index 05b7c4b..4eac9ca 100644 --- a/backend/main.py +++ b/backend/main.py @@ -9,11 +9,16 @@ from api.routers.auth import router as auth_router from api.routers.webhooks import router as webhooks_router from api.routers.tickets import router as tickets_router from api.routers.tournaments import router as tournaments_router +from core.middleware import AuditLogMiddleware app = FastAPI(title="Ticketing System API") from fastapi.middleware.cors import CORSMiddleware +# NOTE: middlewares are applied in reverse registration order (last registered = outermost). +# AuditLog is registered first so it wraps everything, including CORS. +app.add_middleware(AuditLogMiddleware) + app.add_middleware( CORSMiddleware, allow_origins=["*"], # Для локальной песочницы оставляем открытым diff --git a/backend/migrations/versions/ea17bfd32885_add_action_logs.py b/backend/migrations/versions/ea17bfd32885_add_action_logs.py new file mode 100644 index 0000000..ab42399 --- /dev/null +++ b/backend/migrations/versions/ea17bfd32885_add_action_logs.py @@ -0,0 +1,46 @@ +"""add action_logs + +Revision ID: ea17bfd32885 +Revises: b2e071ae215a +Create Date: 2026-03-12 14:17:22.483024 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = 'ea17bfd32885' +down_revision: Union[str, Sequence[str], None] = 'b2e071ae215a' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('action_logs', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=True), + sa.Column('action', sa.String(), nullable=False), + sa.Column('ip_address', sa.String(), nullable=True), + sa.Column('details', postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_action_logs_action'), 'action_logs', ['action'], unique=False) + op.create_index(op.f('ix_action_logs_created_at'), 'action_logs', ['created_at'], unique=False) + op.create_index(op.f('ix_action_logs_user_id'), 'action_logs', ['user_id'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_action_logs_user_id'), table_name='action_logs') + op.drop_index(op.f('ix_action_logs_created_at'), table_name='action_logs') + op.drop_index(op.f('ix_action_logs_action'), table_name='action_logs') + op.drop_table('action_logs') + # ### end Alembic commands ### diff --git a/backend/worker.py b/backend/worker.py index 57c6bcd..b08180b 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -90,18 +90,21 @@ async def _process_message( async with message.process(requeue=True): try: payload: dict[str, Any] = json.loads(message.body) - action: str = payload.get("action", "") - - if action == "ticket_paid": + + # Если есть ticket_id — сразу в работу. Никаких проверок action. + if "ticket_id" in payload: async with session_factory() as session: await _handle_ticket_paid(payload, session) else: - log.debug("Unknown action '%s', skipping", action) + log.warning("Кривой payload, нет ticket_id: %s", payload) except json.JSONDecodeError: log.exception("Failed to decode message body: %r", message.body) - # Некорректный JSON — не возвращаем в очередь (requeue=False через reject) await message.reject(requeue=False) + except Exception as e: + log.exception("Ошибка при обработке сообщения: %s", e) + # Исключение заставит aio_pika вернуть задачу в очередь (requeue=True) + raise async def main() -> None: