""" Standalone worker: слушает очередь RabbitMQ 'ticket_events', генерирует PDF-билет через core.pdf_generator, загружает в MinIO, сохраняет pdf_url в PostgreSQL. """ import asyncio import json import logging import os from typing import Any import aio_pika from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from core.minio import ensure_bucket_exists, upload_pdf from core.pdf_generator import generate_qr_ticket from database.models import Ticket logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) log = logging.getLogger("worker") RABBITMQ_URL: str = os.getenv("RABBITMQ_URL", "amqp://user:password@rabbitmq/") DATABASE_URL: str = os.getenv( "DATABASE_URL", "postgresql+asyncpg://admin:your_strong_password@postgres:5432/ticket_db", ) QUEUE_NAME: str = "ticket_events" async def _handle_ticket_paid( payload: dict[str, Any], db_session: AsyncSession, ) -> None: ticket_id: int | None = payload.get("ticket_id") if ticket_id is None: log.error("Event 'ticket_paid' missing 'ticket_id': %s", payload) return result = await db_session.execute(select(Ticket).where(Ticket.id == ticket_id)) ticket: Ticket | None = result.scalar_one_or_none() if ticket is None: log.error("Ticket %s not found in DB", ticket_id) return if ticket.pdf_url: log.info("Ticket %s already has a PDF, skipping (idempotency guard)", ticket_id) return log.info("Generating PDF for ticket %s …", ticket_id) pdf_bytes = generate_qr_ticket( ticket_id=ticket.id, user_id=ticket.user_id or 0, seat_id=ticket.seat_id, ) object_name = f"tickets/ticket_{ticket_id}.pdf" pdf_url = await upload_pdf(object_name, pdf_bytes) log.info("PDF uploaded: %s", pdf_url) ticket.pdf_url = pdf_url await db_session.commit() log.info("Ticket %s updated with pdf_url", ticket_id) async def _process_message( message: aio_pika.abc.AbstractIncomingMessage, session_factory: async_sessionmaker[AsyncSession], ) -> None: async with message.process(requeue=True): try: payload: dict[str, Any] = json.loads(message.body) action: str = payload.get("action", "") if action == "ticket_paid": async with session_factory() as session: await _handle_ticket_paid(payload, session) else: log.debug("Unknown action '%s', skipping", action) except json.JSONDecodeError: log.exception("Failed to decode message body: %r", message.body) # Некорректный JSON — не возвращаем в очередь (requeue=False через reject) await message.reject(requeue=False) async def main() -> None: engine = create_async_engine(DATABASE_URL, echo=False) session_factory = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) await ensure_bucket_exists() log.info("MinIO bucket ready") connection = await aio_pika.connect_robust(RABBITMQ_URL) async with connection: channel = await connection.channel() await channel.set_qos(prefetch_count=10) queue = await channel.declare_queue(QUEUE_NAME, durable=True) log.info("Worker started. Listening on queue '%s' …", QUEUE_NAME) async with queue.iterator() as queue_iter: async for message in queue_iter: await _process_message(message, session_factory) if __name__ == "__main__": asyncio.run(main())