Files
svg-backend/backend/app/services/remap_service.py
greebo 62550d5cb5 feat(backend): add stale draft guards and reference validation for draft mutations
add stale draft protection for mutation flows

validate referenced entities before applying draft changes
reduce invalid draft writes caused by stale state and broken references

keep mutation behavior explicit and version-aware
2026-03-19 19:25:44 +03:00

103 lines
2.9 KiB
Python

from __future__ import annotations
from fastapi import HTTPException, status
from app.repositories.scheme_seats import (
bulk_remap_scheme_version_seats,
list_scheme_version_seats,
)
from app.services.editor_validation import validate_remap_target_references
def _match_seat(
seat,
*,
seat_record_ids: set[str] | None,
from_sector_id: str | None,
from_group_id: str | None,
) -> bool:
if seat_record_ids is not None and seat.seat_record_id not in seat_record_ids:
return False
if from_sector_id is not None and seat.sector_id != from_sector_id:
return False
if from_group_id is not None and seat.group_id != from_group_id:
return False
return True
async def preview_remap(
*,
scheme_version_id: str,
seat_record_ids: list[str] | None,
from_sector_id: str | None,
to_sector_id: str | None,
from_group_id: str | None,
to_group_id: str | None,
) -> list[dict]:
if not any([seat_record_ids, from_sector_id, from_group_id]):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="At least one remap filter must be provided",
)
await validate_remap_target_references(
scheme_version_id=scheme_version_id,
to_sector_id=to_sector_id,
to_group_id=to_group_id,
)
seats = await list_scheme_version_seats(scheme_version_id)
seat_record_id_set = set(seat_record_ids) if seat_record_ids else None
matched: list[dict] = []
for seat in seats:
if not _match_seat(
seat,
seat_record_ids=seat_record_id_set,
from_sector_id=from_sector_id,
from_group_id=from_group_id,
):
continue
matched.append(
{
"seat_record_id": seat.seat_record_id,
"seat_id": seat.seat_id,
"before_sector_id": seat.sector_id,
"after_sector_id": to_sector_id if to_sector_id is not None else seat.sector_id,
"before_group_id": seat.group_id,
"after_group_id": to_group_id if to_group_id is not None else seat.group_id,
}
)
return matched
async def apply_remap(
*,
scheme_version_id: str,
seat_record_ids: list[str] | None,
from_sector_id: str | None,
to_sector_id: str | None,
from_group_id: str | None,
to_group_id: str | None,
) -> list[dict]:
preview_items = await preview_remap(
scheme_version_id=scheme_version_id,
seat_record_ids=seat_record_ids,
from_sector_id=from_sector_id,
to_sector_id=to_sector_id,
from_group_id=from_group_id,
to_group_id=to_group_id,
)
if not preview_items:
return []
await bulk_remap_scheme_version_seats(
scheme_version_id=scheme_version_id,
items=preview_items,
)
return preview_items