From 50221c57e1b869d6046f363452386003a9f8d934 Mon Sep 17 00:00:00 2001 From: openit Date: Fri, 6 Mar 2026 10:46:04 +0000 Subject: [PATCH] Update project 3 iteration feat: core booking pipeline, webhook and async pdf worker CORE COPLETE --- backend/api/routers/webhooks.py | 43 +++++++++++++++++++ backend/core/rabbitmq.py | 34 +++++++++++++++ backend/main.py | 2 + .../a55d80c4b300_add_pdf_url_to_ticket.py | 32 ++++++++++++++ backend/schemas/payment.py | 14 ++++++ 5 files changed, 125 insertions(+) create mode 100644 backend/api/routers/webhooks.py create mode 100644 backend/core/rabbitmq.py create mode 100644 backend/migrations/versions/a55d80c4b300_add_pdf_url_to_ticket.py create mode 100644 backend/schemas/payment.py diff --git a/backend/api/routers/webhooks.py b/backend/api/routers/webhooks.py new file mode 100644 index 0000000..5bba3f6 --- /dev/null +++ b/backend/api/routers/webhooks.py @@ -0,0 +1,43 @@ +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.rabbitmq import publish_ticket_paid_event +from database.models import Ticket, TicketStatus +from database.session import get_db +from schemas.payment import PaymentWebhookRequest + +router = APIRouter(prefix="/api/webhooks", tags=["webhooks"]) + + +@router.post("/payment", status_code=status.HTTP_200_OK) +async def payment_webhook( + body: PaymentWebhookRequest, + db: AsyncSession = Depends(get_db), +) -> dict[str, str]: + result = await db.execute(select(Ticket).where(Ticket.id == body.ticket_id)) + ticket: Ticket | None = result.scalar_one_or_none() + + if ticket is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Ticket {body.ticket_id} not found", + ) + + # Idempotency guard: повторный запрос с тем же ключом — возвращаем 200 без действий + if ticket.idempotency_key == body.idempotency_key: + return {"detail": "Already processed"} + + if ticket.status != TicketStatus.LOCKED: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Ticket is in '{ticket.status}' state, expected 'LOCKED'", + ) + + ticket.status = TicketStatus.PAID + ticket.idempotency_key = body.idempotency_key + await db.commit() + + await publish_ticket_paid_event(ticket_id=ticket.id, user_id=ticket.user_id or 0) + + return {"detail": "Payment processed"} diff --git a/backend/core/rabbitmq.py b/backend/core/rabbitmq.py new file mode 100644 index 0000000..cea4c24 --- /dev/null +++ b/backend/core/rabbitmq.py @@ -0,0 +1,34 @@ +import json +import os + +import aio_pika + +RABBITMQ_URL: str = os.getenv("RABBITMQ_URL", "amqp://user:password@rabbitmq/") +QUEUE_NAME: str = "ticket_events" + + +async def publish_ticket_paid_event(ticket_id: int, user_id: int) -> None: + """Публикует событие ticket_paid в очередь RabbitMQ. + + Формат совпадает с тем, что ожидает worker.py: + {"action": "ticket_paid", "ticket_id": , "user_id": } + """ + connection = await aio_pika.connect_robust(RABBITMQ_URL) + async with connection: + channel = await connection.channel() + + # durable=True — очередь переживёт перезапуск брокера + queue = await channel.declare_queue(QUEUE_NAME, durable=True) + + body = json.dumps( + {"action": "ticket_paid", "ticket_id": ticket_id, "user_id": user_id} + ).encode() + + await channel.default_exchange.publish( + aio_pika.Message( + body=body, + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + content_type="application/json", + ), + routing_key=queue.name, + ) diff --git a/backend/main.py b/backend/main.py index df7ef7a..fd427cb 100644 --- a/backend/main.py +++ b/backend/main.py @@ -6,10 +6,12 @@ from database.session import get_db from database.models import Ticket, TicketStatus from core.redis import acquire_seat_lock, release_seat_lock from api.routers.auth import router as auth_router +from api.routers.webhooks import router as webhooks_router app = FastAPI(title="Ticketing System API") app.include_router(auth_router) +app.include_router(webhooks_router) @app.post("/api/seats/{seat_id}/lock", status_code=status.HTTP_200_OK) async def lock_seat(seat_id: int, user_id: int, db: AsyncSession = Depends(get_db)): diff --git a/backend/migrations/versions/a55d80c4b300_add_pdf_url_to_ticket.py b/backend/migrations/versions/a55d80c4b300_add_pdf_url_to_ticket.py new file mode 100644 index 0000000..51ef52c --- /dev/null +++ b/backend/migrations/versions/a55d80c4b300_add_pdf_url_to_ticket.py @@ -0,0 +1,32 @@ +"""add pdf_url to ticket + +Revision ID: a55d80c4b300 +Revises: 762b863b233b +Create Date: 2026-03-06 10:24:14.795359 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'a55d80c4b300' +down_revision: Union[str, Sequence[str], None] = '762b863b233b' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('tickets', sa.Column('pdf_url', sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('tickets', 'pdf_url') + # ### end Alembic commands ### diff --git a/backend/schemas/payment.py b/backend/schemas/payment.py new file mode 100644 index 0000000..e8b756e --- /dev/null +++ b/backend/schemas/payment.py @@ -0,0 +1,14 @@ +from pydantic import BaseModel, field_validator + + +class PaymentWebhookRequest(BaseModel): + ticket_id: int + idempotency_key: str + status: str + + @field_validator("status") + @classmethod + def status_must_be_success(cls, v: str) -> str: + if v != "success": + raise ValueError("Only 'success' status is accepted") + return v