132 lines
4.4 KiB
Python
132 lines
4.4 KiB
Python
"""
|
|
Standalone worker: слушает очередь RabbitMQ 'ticket_events',
|
|
генерирует PDF-билет через core.pdf_generator, загружает в MinIO,
|
|
сохраняет pdf_url в PostgreSQL.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
import aio_pika
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
|
|
from core.minio import ensure_bucket_exists, upload_pdf
|
|
from core.pdf_generator import generate_qr_ticket
|
|
from database.models import Seat, Ticket, Tournament
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
)
|
|
log = logging.getLogger("worker")
|
|
|
|
RABBITMQ_URL: str = os.getenv("RABBITMQ_URL", "amqp://user:password@rabbitmq/")
|
|
DATABASE_URL: str = os.getenv(
|
|
"DATABASE_URL",
|
|
"postgresql+asyncpg://admin:your_strong_password@postgres:5432/ticket_db",
|
|
)
|
|
QUEUE_NAME: str = "ticket_events"
|
|
|
|
|
|
async def _handle_ticket_paid(
|
|
payload: dict[str, Any],
|
|
db_session: AsyncSession,
|
|
) -> None:
|
|
ticket_id: int | None = payload.get("ticket_id")
|
|
if ticket_id is None:
|
|
log.error("Event 'ticket_paid' missing 'ticket_id': %s", payload)
|
|
return
|
|
|
|
# JOIN: Ticket → Seat → Tournament (one query, no N+1)
|
|
stmt = (
|
|
select(Ticket, Seat, Tournament)
|
|
.join(Seat, Ticket.seat_id == Seat.id)
|
|
.join(Tournament, Seat.tournament_id == Tournament.id)
|
|
.where(Ticket.id == ticket_id)
|
|
)
|
|
row = (await db_session.execute(stmt)).first()
|
|
|
|
if row is None:
|
|
log.error("Ticket %s (or related Seat/Tournament) not found in DB", ticket_id)
|
|
return
|
|
|
|
ticket, seat, tournament = row
|
|
|
|
if ticket.pdf_url:
|
|
log.info("Ticket %s already has a PDF, skipping (idempotency guard)", ticket_id)
|
|
return
|
|
|
|
# Format event date: "DD.MM.YYYY HH:MM"
|
|
date_str = tournament.event_date.strftime("%d.%m.%Y %H:%M")
|
|
|
|
log.info("Generating PDF for ticket %s …", ticket_id)
|
|
pdf_bytes = generate_qr_ticket(
|
|
ticket_id=ticket.id,
|
|
title=tournament.title,
|
|
date_str=date_str,
|
|
sector=seat.sector,
|
|
row=seat.row,
|
|
number=seat.number,
|
|
price=seat.price,
|
|
secret_token=str(ticket.secret_token),
|
|
)
|
|
|
|
object_name = f"tickets/ticket_{ticket_id}.pdf"
|
|
pdf_url = await upload_pdf(object_name, pdf_bytes)
|
|
log.info("PDF uploaded: %s", pdf_url)
|
|
|
|
ticket.pdf_url = pdf_url
|
|
await db_session.commit()
|
|
log.info("Ticket %s updated with pdf_url", ticket_id)
|
|
|
|
|
|
async def _process_message(
|
|
message: aio_pika.abc.AbstractIncomingMessage,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
) -> None:
|
|
async with message.process(requeue=True):
|
|
try:
|
|
payload: dict[str, Any] = json.loads(message.body)
|
|
|
|
# Если есть ticket_id — сразу в работу. Никаких проверок action.
|
|
if "ticket_id" in payload:
|
|
async with session_factory() as session:
|
|
await _handle_ticket_paid(payload, session)
|
|
else:
|
|
log.warning("Кривой payload, нет ticket_id: %s", payload)
|
|
|
|
except json.JSONDecodeError:
|
|
log.exception("Failed to decode message body: %r", message.body)
|
|
await message.reject(requeue=False)
|
|
except Exception as e:
|
|
log.exception("Ошибка при обработке сообщения: %s", e)
|
|
# Исключение заставит aio_pika вернуть задачу в очередь (requeue=True)
|
|
raise
|
|
|
|
|
|
async def main() -> None:
|
|
engine = create_async_engine(DATABASE_URL, echo=False)
|
|
session_factory = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
|
|
|
|
await ensure_bucket_exists()
|
|
log.info("MinIO bucket ready")
|
|
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URL)
|
|
async with connection:
|
|
channel = await connection.channel()
|
|
await channel.set_qos(prefetch_count=10)
|
|
|
|
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
|
|
log.info("Worker started. Listening on queue '%s' …", QUEUE_NAME)
|
|
|
|
async with queue.iterator() as queue_iter:
|
|
async for message in queue_iter:
|
|
await _process_message(message, session_factory)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|