""" Standalone worker: слушает очередь RabbitMQ 'ticket_events', генерирует PDF-билет через core.pdf_generator, загружает в MinIO, сохраняет pdf_url в PostgreSQL. """ import asyncio import json import logging import os from typing import Any import aio_pika from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from core.minio import ensure_bucket_exists, upload_pdf from core.pdf_generator import generate_qr_ticket from database.models import Seat, Ticket, Tournament logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) log = logging.getLogger("worker") RABBITMQ_URL: str = os.getenv("RABBITMQ_URL", "amqp://user:password@rabbitmq/") DATABASE_URL: str = os.getenv( "DATABASE_URL", "postgresql+asyncpg://admin:your_strong_password@postgres:5432/ticket_db", ) QUEUE_NAME: str = "ticket_events" async def _handle_ticket_paid( payload: dict[str, Any], db_session: AsyncSession, ) -> None: ticket_id: int | None = payload.get("ticket_id") if ticket_id is None: log.error("Event 'ticket_paid' missing 'ticket_id': %s", payload) return # JOIN: Ticket → Seat → Tournament (one query, no N+1) stmt = ( select(Ticket, Seat, Tournament) .join(Seat, Ticket.seat_id == Seat.id) .join(Tournament, Seat.tournament_id == Tournament.id) .where(Ticket.id == ticket_id) ) row = (await db_session.execute(stmt)).first() if row is None: log.error("Ticket %s (or related Seat/Tournament) not found in DB", ticket_id) return ticket, seat, tournament = row if ticket.pdf_url: log.info("Ticket %s already has a PDF, skipping (idempotency guard)", ticket_id) return # Format event date: "DD.MM.YYYY HH:MM" date_str = tournament.event_date.strftime("%d.%m.%Y %H:%M") log.info("Generating PDF for ticket %s …", ticket_id) pdf_bytes = generate_qr_ticket( ticket_id=ticket.id, title=tournament.title, date_str=date_str, sector=seat.sector, row=seat.row, number=seat.number, price=seat.price, secret_token=str(ticket.secret_token), ) object_name = f"tickets/ticket_{ticket_id}.pdf" pdf_url = await upload_pdf(object_name, pdf_bytes) log.info("PDF uploaded: %s", pdf_url) ticket.pdf_url = pdf_url await db_session.commit() log.info("Ticket %s updated with pdf_url", ticket_id) async def _process_message( message: aio_pika.abc.AbstractIncomingMessage, session_factory: async_sessionmaker[AsyncSession], ) -> None: async with message.process(requeue=True): try: payload: dict[str, Any] = json.loads(message.body) action: str = payload.get("action", "") if action == "ticket_paid": async with session_factory() as session: await _handle_ticket_paid(payload, session) else: log.debug("Unknown action '%s', skipping", action) except json.JSONDecodeError: log.exception("Failed to decode message body: %r", message.body) # Некорректный JSON — не возвращаем в очередь (requeue=False через reject) await message.reject(requeue=False) async def main() -> None: engine = create_async_engine(DATABASE_URL, echo=False) session_factory = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) await ensure_bucket_exists() log.info("MinIO bucket ready") connection = await aio_pika.connect_robust(RABBITMQ_URL) async with connection: channel = await connection.channel() await channel.set_qos(prefetch_count=10) queue = await channel.declare_queue(QUEUE_NAME, durable=True) log.info("Worker started. Listening on queue '%s' …", QUEUE_NAME) async with queue.iterator() as queue_iter: async for message in queue_iter: await _process_message(message, session_factory) if __name__ == "__main__": asyncio.run(main())