Files
svg-backend/backend/app/services/editor_validation.py
greebo 62550d5cb5 feat(backend): add stale draft guards and reference validation for draft mutations
add stale draft protection for mutation flows

validate referenced entities before applying draft changes
reduce invalid draft writes caused by stale state and broken references

keep mutation behavior explicit and version-aware
2026-03-19 19:25:44 +03:00

275 lines
8.7 KiB
Python

from __future__ import annotations
from fastapi import HTTPException, status
from app.repositories.scheme_groups import list_scheme_version_groups
from app.repositories.scheme_seats import list_scheme_version_seats
from app.repositories.scheme_sectors import list_scheme_version_sectors
def _raise_uniqueness_error(message: str, detail: dict | None = None) -> None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=detail or {"code": "editor_uniqueness_error", "message": message},
)
def _raise_reference_error(message: str, detail: dict | None = None) -> None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=detail or {"code": "editor_reference_error", "message": message},
)
async def validate_single_seat_patch_uniqueness(
*,
scheme_version_id: str,
seat_record_id: str,
new_seat_id: str | None,
) -> None:
if not new_seat_id:
return
seats = await list_scheme_version_seats(scheme_version_id)
for row in seats:
if row.seat_record_id == seat_record_id:
continue
if row.seat_id == new_seat_id:
_raise_uniqueness_error(
f"Seat id already exists in current draft version: {new_seat_id}",
{
"code": "duplicate_seat_id",
"message": "Seat id already exists in current draft version",
"seat_id": new_seat_id,
"conflict_seat_record_id": row.seat_record_id,
},
)
async def validate_bulk_seat_patch_uniqueness(
*,
scheme_version_id: str,
items: list[dict],
) -> None:
seats = await list_scheme_version_seats(scheme_version_id)
existing_by_seat_id: dict[str, str] = {
row.seat_id: row.seat_record_id
for row in seats
if row.seat_id
}
seen_new_ids: dict[str, str] = {}
for item in items:
seat_record_id = item["seat_record_id"]
seat_id = item.get("seat_id")
if not seat_id:
continue
existing_record_id = existing_by_seat_id.get(seat_id)
if existing_record_id and existing_record_id != seat_record_id:
_raise_uniqueness_error(
f"Seat id already exists in current draft version: {seat_id}",
{
"code": "duplicate_seat_id",
"message": "Seat id already exists in current draft version",
"seat_id": seat_id,
"conflict_seat_record_id": existing_record_id,
"input_seat_record_id": seat_record_id,
},
)
seen_record_id = seen_new_ids.get(seat_id)
if seen_record_id and seen_record_id != seat_record_id:
_raise_uniqueness_error(
f"Seat id is duplicated inside bulk payload: {seat_id}",
{
"code": "duplicate_seat_id_in_payload",
"message": "Seat id is duplicated inside bulk payload",
"seat_id": seat_id,
"first_seat_record_id": seen_record_id,
"second_seat_record_id": seat_record_id,
},
)
seen_new_ids[seat_id] = seat_record_id
async def validate_sector_patch_uniqueness(
*,
scheme_version_id: str,
sector_record_id: str,
new_sector_id: str | None,
) -> None:
if not new_sector_id:
return
rows = await list_scheme_version_sectors(scheme_version_id)
for row in rows:
if row.sector_record_id == sector_record_id:
continue
if row.sector_id == new_sector_id:
_raise_uniqueness_error(
f"Sector id already exists in current draft version: {new_sector_id}",
{
"code": "duplicate_sector_id",
"message": "Sector id already exists in current draft version",
"sector_id": new_sector_id,
"conflict_sector_record_id": row.sector_record_id,
},
)
async def validate_group_patch_uniqueness(
*,
scheme_version_id: str,
group_record_id: str,
new_group_id: str | None,
) -> None:
if not new_group_id:
return
rows = await list_scheme_version_groups(scheme_version_id)
for row in rows:
if row.group_record_id == group_record_id:
continue
if row.group_id == new_group_id:
_raise_uniqueness_error(
f"Group id already exists in current draft version: {new_group_id}",
{
"code": "duplicate_group_id",
"message": "Group id already exists in current draft version",
"group_id": new_group_id,
"conflict_group_record_id": row.group_record_id,
},
)
async def validate_single_seat_patch_references(
*,
scheme_version_id: str,
sector_id: str | None,
group_id: str | None,
) -> None:
sector_ids = {
row.sector_id
for row in await list_scheme_version_sectors(scheme_version_id)
if row.sector_id
}
group_ids = {
row.group_id
for row in await list_scheme_version_groups(scheme_version_id)
if row.group_id
}
if sector_id is not None and sector_id not in sector_ids:
_raise_reference_error(
f"Sector id does not exist in current draft version: {sector_id}",
{
"code": "unknown_sector_id",
"message": "Sector id does not exist in current draft version",
"sector_id": sector_id,
},
)
if group_id is not None and group_id not in group_ids:
_raise_reference_error(
f"Group id does not exist in current draft version: {group_id}",
{
"code": "unknown_group_id",
"message": "Group id does not exist in current draft version",
"group_id": group_id,
},
)
async def validate_bulk_seat_patch_references(
*,
scheme_version_id: str,
items: list[dict],
) -> None:
sector_ids = {
row.sector_id
for row in await list_scheme_version_sectors(scheme_version_id)
if row.sector_id
}
group_ids = {
row.group_id
for row in await list_scheme_version_groups(scheme_version_id)
if row.group_id
}
unknown_sector_refs = sorted(
{
item["sector_id"]
for item in items
if item.get("sector_id") is not None and item["sector_id"] not in sector_ids
}
)
if unknown_sector_refs:
_raise_reference_error(
"Bulk payload contains unknown sector_id values",
{
"code": "unknown_sector_ids",
"message": "Bulk payload contains unknown sector_id values",
"sector_ids": unknown_sector_refs,
},
)
unknown_group_refs = sorted(
{
item["group_id"]
for item in items
if item.get("group_id") is not None and item["group_id"] not in group_ids
}
)
if unknown_group_refs:
_raise_reference_error(
"Bulk payload contains unknown group_id values",
{
"code": "unknown_group_ids",
"message": "Bulk payload contains unknown group_id values",
"group_ids": unknown_group_refs,
},
)
async def validate_remap_target_references(
*,
scheme_version_id: str,
to_sector_id: str | None,
to_group_id: str | None,
) -> None:
sector_ids = {
row.sector_id
for row in await list_scheme_version_sectors(scheme_version_id)
if row.sector_id
}
group_ids = {
row.group_id
for row in await list_scheme_version_groups(scheme_version_id)
if row.group_id
}
if to_sector_id is not None and to_sector_id not in sector_ids:
_raise_reference_error(
f"Target sector_id does not exist in current draft version: {to_sector_id}",
{
"code": "unknown_target_sector_id",
"message": "Target sector_id does not exist in current draft version",
"sector_id": to_sector_id,
},
)
if to_group_id is not None and to_group_id not in group_ids:
_raise_reference_error(
f"Target group_id does not exist in current draft version: {to_group_id}",
{
"code": "unknown_target_group_id",
"message": "Target group_id does not exist in current draft version",
"group_id": to_group_id,
},
)