Files
svg-backend/backend/app/services/remap_service.py
greebo af175d88dd refactor(api): unify typed error contract across draft pricing and publish flows
standardize typed error responses across draft, pricing and publish endpoints

reduce contract drift between related flows
keep client-side handling more predictable and consistent
2026-03-19 19:54:42 +03:00

102 lines
2.9 KiB
Python

from __future__ import annotations
from app.repositories.scheme_seats import (
bulk_remap_scheme_version_seats,
list_scheme_version_seats,
)
from app.services.api_errors import raise_unprocessable
from app.services.editor_validation import validate_remap_target_references
def _match_seat(
seat,
*,
seat_record_ids: set[str] | None,
from_sector_id: str | None,
from_group_id: str | None,
) -> bool:
if seat_record_ids is not None and seat.seat_record_id not in seat_record_ids:
return False
if from_sector_id is not None and seat.sector_id != from_sector_id:
return False
if from_group_id is not None and seat.group_id != from_group_id:
return False
return True
async def preview_remap(
*,
scheme_version_id: str,
seat_record_ids: list[str] | None,
from_sector_id: str | None,
to_sector_id: str | None,
from_group_id: str | None,
to_group_id: str | None,
) -> list[dict]:
if not any([seat_record_ids, from_sector_id, from_group_id]):
raise_unprocessable(
code="remap_filter_required",
message="At least one remap filter must be provided",
)
await validate_remap_target_references(
scheme_version_id=scheme_version_id,
to_sector_id=to_sector_id,
to_group_id=to_group_id,
)
seats = await list_scheme_version_seats(scheme_version_id)
seat_record_id_set = set(seat_record_ids) if seat_record_ids else None
matched: list[dict] = []
for seat in seats:
if not _match_seat(
seat,
seat_record_ids=seat_record_id_set,
from_sector_id=from_sector_id,
from_group_id=from_group_id,
):
continue
matched.append(
{
"seat_record_id": seat.seat_record_id,
"seat_id": seat.seat_id,
"before_sector_id": seat.sector_id,
"after_sector_id": to_sector_id if to_sector_id is not None else seat.sector_id,
"before_group_id": seat.group_id,
"after_group_id": to_group_id if to_group_id is not None else seat.group_id,
}
)
return matched
async def apply_remap(
*,
scheme_version_id: str,
seat_record_ids: list[str] | None,
from_sector_id: str | None,
to_sector_id: str | None,
from_group_id: str | None,
to_group_id: str | None,
) -> list[dict]:
preview_items = await preview_remap(
scheme_version_id=scheme_version_id,
seat_record_ids=seat_record_ids,
from_sector_id=from_sector_id,
to_sector_id=to_sector_id,
from_group_id=from_group_id,
to_group_id=to_group_id,
)
if not preview_items:
return []
await bulk_remap_scheme_version_seats(
scheme_version_id=scheme_version_id,
items=preview_items,
)
return preview_items