Files
ticket-system/backend/core/redis.py
2026-03-05 14:27:30 +00:00

33 lines
1.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from redis.asyncio import Redis, from_url
# Берем URL из окружения или ставим дефолт для нашей docker-сети
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
# Глобальный пул соединений
redis_client: Redis = from_url(REDIS_URL, decode_responses=True)
async def get_redis() -> Redis:
return redis_client
async def acquire_seat_lock(seat_id: int, user_id: int, ttl_seconds: int = 900) -> bool:
"""
Пытается захватить блокировку на место.
ttl_seconds = 900 (15 минут на оплату по ТЗ).
Возвращает True, если блокировка получена, иначе False.
"""
lock_key = f"lock:seat:{seat_id}"
# SETNX: Set if Not eXists. Если ключ есть, вернет None/False
# ex: устанавливает время жизни ключа (TTL)
is_locked = await redis_client.set(lock_key, str(user_id), nx=True, ex=ttl_seconds)
return bool(is_locked)
async def release_seat_lock(seat_id: int) -> None:
"""
Принудительно снимает блокировку (например, при отмене или ошибке БД).
"""
lock_key = f"lock:seat:{seat_id}"
await redis_client.delete(lock_key)