Update project 2nd iteration
This commit is contained in:
50
backend/core/minio.py
Normal file
50
backend/core/minio.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import AsyncIterator
|
||||
|
||||
import aioboto3
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
MINIO_ENDPOINT: str = os.getenv("MINIO_ENDPOINT", "http://minio:9000")
|
||||
MINIO_ACCESS_KEY: str = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
|
||||
MINIO_SECRET_KEY: str = os.getenv("MINIO_SECRET_KEY", "minioadminpassword")
|
||||
MINIO_BUCKET: str = os.getenv("MINIO_BUCKET", "tickets-media")
|
||||
|
||||
_session = aioboto3.Session()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def _s3_client() -> AsyncIterator:
|
||||
async with _session.client(
|
||||
"s3",
|
||||
endpoint_url=MINIO_ENDPOINT,
|
||||
aws_access_key_id=MINIO_ACCESS_KEY,
|
||||
aws_secret_access_key=MINIO_SECRET_KEY,
|
||||
region_name="us-east-1", # MinIO не требует региона, но boto3 обязывает передать
|
||||
) as client:
|
||||
yield client
|
||||
|
||||
|
||||
async def ensure_bucket_exists() -> None:
|
||||
"""Создаёт бакет при старте воркера, если он ещё не существует."""
|
||||
async with _s3_client() as client:
|
||||
try:
|
||||
await client.head_bucket(Bucket=MINIO_BUCKET)
|
||||
except ClientError as exc:
|
||||
error_code = exc.response["Error"]["Code"]
|
||||
if error_code in ("404", "NoSuchBucket"):
|
||||
await client.create_bucket(Bucket=MINIO_BUCKET)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
async def upload_pdf(object_name: str, pdf_bytes: bytes) -> str:
|
||||
"""Загружает PDF в MinIO и возвращает публичный URL объекта."""
|
||||
async with _s3_client() as client:
|
||||
await client.put_object(
|
||||
Bucket=MINIO_BUCKET,
|
||||
Key=object_name,
|
||||
Body=pdf_bytes,
|
||||
ContentType="application/pdf",
|
||||
)
|
||||
return f"{MINIO_ENDPOINT}/{MINIO_BUCKET}/{object_name}"
|
||||
@@ -52,6 +52,7 @@ class Ticket(Base):
|
||||
index=True
|
||||
)
|
||||
idempotency_key: Mapped[str] = mapped_column(String, unique=True, nullable=True)
|
||||
pdf_url: Mapped[str | None] = mapped_column(String, nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
|
||||
@@ -7,4 +7,7 @@ psycopg2-binary
|
||||
redis
|
||||
passlib[bcrypt]
|
||||
PyJWT
|
||||
aio-pika
|
||||
reportlab
|
||||
aioboto3
|
||||
pydantic[email]>=2.5.0
|
||||
|
||||
135
backend/worker.py
Normal file
135
backend/worker.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""
|
||||
Standalone worker: слушает очередь RabbitMQ 'ticket_events',
|
||||
генерирует PDF-билет через reportlab, загружает в MinIO,
|
||||
сохраняет pdf_url в PostgreSQL.
|
||||
"""
|
||||
import asyncio
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import aio_pika
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.pdfgen import canvas
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from core.minio import ensure_bucket_exists, upload_pdf
|
||||
from database.models import Ticket
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||||
)
|
||||
log = logging.getLogger("worker")
|
||||
|
||||
RABBITMQ_URL: str = os.getenv("RABBITMQ_URL", "amqp://user:password@rabbitmq/")
|
||||
DATABASE_URL: str = os.getenv(
|
||||
"DATABASE_URL",
|
||||
"postgresql+asyncpg://admin:your_strong_password@postgres:5432/ticket_db",
|
||||
)
|
||||
QUEUE_NAME: str = "ticket_events"
|
||||
|
||||
|
||||
def _build_pdf(ticket_id: int, seat_id: int, user_id: int) -> bytes:
|
||||
"""Генерирует PDF-билет в памяти и возвращает байты."""
|
||||
buffer = io.BytesIO()
|
||||
pdf = canvas.Canvas(buffer, pagesize=A4)
|
||||
width, height = A4
|
||||
|
||||
pdf.setFont("Helvetica-Bold", 24)
|
||||
pdf.drawCentredString(width / 2, height - 80, "TICKET")
|
||||
|
||||
pdf.setFont("Helvetica", 14)
|
||||
pdf.drawCentredString(width / 2, height - 130, f"Ticket ID: {ticket_id}")
|
||||
pdf.drawCentredString(width / 2, height - 160, f"Seat ID: {seat_id}")
|
||||
pdf.drawCentredString(width / 2, height - 190, f"User ID: {user_id}")
|
||||
|
||||
pdf.setFont("Helvetica-Oblique", 10)
|
||||
pdf.drawCentredString(width / 2, 40, "Thank you for your purchase!")
|
||||
|
||||
pdf.save()
|
||||
return buffer.getvalue()
|
||||
|
||||
|
||||
async def _handle_ticket_paid(
|
||||
payload: dict[str, Any],
|
||||
db_session: AsyncSession,
|
||||
) -> None:
|
||||
ticket_id: int | None = payload.get("ticket_id")
|
||||
if ticket_id is None:
|
||||
log.error("Event 'ticket_paid' missing 'ticket_id': %s", payload)
|
||||
return
|
||||
|
||||
result = await db_session.execute(select(Ticket).where(Ticket.id == ticket_id))
|
||||
ticket: Ticket | None = result.scalar_one_or_none()
|
||||
|
||||
if ticket is None:
|
||||
log.error("Ticket %s not found in DB", ticket_id)
|
||||
return
|
||||
|
||||
if ticket.pdf_url:
|
||||
log.info("Ticket %s already has a PDF, skipping (idempotency guard)", ticket_id)
|
||||
return
|
||||
|
||||
log.info("Generating PDF for ticket %s …", ticket_id)
|
||||
pdf_bytes = _build_pdf(
|
||||
ticket_id=ticket.id,
|
||||
seat_id=ticket.seat_id,
|
||||
user_id=ticket.user_id or 0,
|
||||
)
|
||||
|
||||
object_name = f"tickets/ticket_{ticket_id}.pdf"
|
||||
pdf_url = await upload_pdf(object_name, pdf_bytes)
|
||||
log.info("PDF uploaded: %s", pdf_url)
|
||||
|
||||
ticket.pdf_url = pdf_url
|
||||
await db_session.commit()
|
||||
log.info("Ticket %s updated with pdf_url", ticket_id)
|
||||
|
||||
|
||||
async def _process_message(
|
||||
message: aio_pika.abc.AbstractIncomingMessage,
|
||||
session_factory: async_sessionmaker[AsyncSession],
|
||||
) -> None:
|
||||
async with message.process(requeue=True):
|
||||
try:
|
||||
payload: dict[str, Any] = json.loads(message.body)
|
||||
action: str = payload.get("action", "")
|
||||
|
||||
if action == "ticket_paid":
|
||||
async with session_factory() as session:
|
||||
await _handle_ticket_paid(payload, session)
|
||||
else:
|
||||
log.debug("Unknown action '%s', skipping", action)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
log.exception("Failed to decode message body: %r", message.body)
|
||||
# Некорректный JSON — не возвращаем в очередь (requeue=False через reject)
|
||||
await message.reject(requeue=False)
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
engine = create_async_engine(DATABASE_URL, echo=False)
|
||||
session_factory = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
|
||||
|
||||
await ensure_bucket_exists()
|
||||
log.info("MinIO bucket ready")
|
||||
|
||||
connection = await aio_pika.connect_robust(RABBITMQ_URL)
|
||||
async with connection:
|
||||
channel = await connection.channel()
|
||||
await channel.set_qos(prefetch_count=10)
|
||||
|
||||
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
|
||||
log.info("Worker started. Listening on queue '%s' …", QUEUE_NAME)
|
||||
|
||||
async with queue.iterator() as queue_iter:
|
||||
async for message in queue_iter:
|
||||
await _process_message(message, session_factory)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user