Files
svg-backend/backend/app/api/routes.py
2026-03-19 13:39:32 +03:00

1082 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
from pathlib import Path
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile, status
from fastapi.responses import FileResponse
from app.core.config import settings
from app.db.session import db_ping
from app.repositories.audit import create_audit_event, list_audit_events
from app.repositories.pricing import (
create_price_rule,
create_pricing_category,
delete_price_rule,
delete_pricing_category,
find_effective_price_rule,
list_price_rules,
list_pricing_categories,
update_price_rule,
update_pricing_category,
)
from app.repositories.scheme_groups import (
clone_scheme_version_groups,
list_scheme_version_groups,
replace_scheme_version_groups,
)
from app.repositories.scheme_seats import (
clone_scheme_version_seats,
get_scheme_version_seat_by_seat_id,
list_scheme_version_seats,
replace_scheme_version_seats,
)
from app.repositories.scheme_sectors import (
clone_scheme_version_sectors,
list_scheme_version_sectors,
replace_scheme_version_sectors,
)
from app.repositories.scheme_versions import (
count_scheme_versions,
create_initial_scheme_version,
create_next_scheme_version_from_current,
get_current_scheme_version,
list_scheme_versions,
)
from app.repositories.schemes import (
count_scheme_records,
create_scheme_from_upload,
get_scheme_record_by_scheme_id,
list_scheme_records,
publish_scheme,
rollback_scheme_to_version,
unpublish_scheme,
)
from app.repositories.uploads import (
count_upload_records,
create_upload_record,
get_upload_record_by_upload_id,
list_upload_records,
)
from app.schemas.audit import AuditEventItem, SchemeAuditResponse
from app.schemas.manifest import ServiceManifestResponse
from app.schemas.pricing import (
DeleteResponse,
EffectiveSeatPriceResponse,
PriceRuleCreateRequest,
PriceRuleCreateResponse,
PriceRuleItem,
PriceRuleUpdateRequest,
PriceRuleUpdateResponse,
PricingCategoryCreateRequest,
PricingCategoryCreateResponse,
PricingCategoryItem,
PricingCategoryUpdateRequest,
PricingCategoryUpdateResponse,
SchemePricingResponse,
)
from app.schemas.scheme_groups import SchemeGroupItem, SchemeGroupListResponse
from app.schemas.scheme_registry import (
SchemeCurrentResponse,
SchemeDetailResponse,
SchemeListItem,
SchemeListResponse,
SchemePublishResponse,
SchemeRollbackRequest,
SchemeRollbackResponse,
)
from app.schemas.scheme_seats import SchemeSeatItem, SchemeSeatListResponse
from app.schemas.scheme_sectors import SchemeSectorItem, SchemeSectorListResponse
from app.schemas.scheme_versions import (
SchemeVersionCreateResponse,
SchemeVersionListItem,
SchemeVersionListResponse,
)
from app.schemas.test_mode import TestSeatPreviewResponse
from app.schemas.upload import UploadResponse
from app.schemas.upload_registry import UploadDetailResponse, UploadListItem, UploadListResponse
from app.security.auth import require_api_key
from app.services.normalized_reader import read_normalized_payload_from_path
from app.services.storage import (
load_normalized_json,
save_normalized_json,
save_original_svg,
save_sanitized_svg,
)
from app.services.svg_inspector import inspect_svg_bytes
from app.services.svg_normalizer import normalize_svg_bytes_to_json
from app.services.svg_sanitizer import sanitize_svg_bytes
router = APIRouter()
logger = logging.getLogger(__name__)
@router.get("/healthz")
async def healthz():
return {"status": "ok"}
@router.get(f"{settings.api_v1_prefix}/db/ping")
async def ping_db(role: str = Depends(require_api_key)):
result = await db_ping()
return {
"database": "ok",
"result": result,
"host": settings.postgres_host,
"port": settings.postgres_port,
"db": settings.postgres_db,
}
@router.get(f"{settings.api_v1_prefix}/ping")
async def ping(role: str = Depends(require_api_key)):
return {
"message": "pong",
"prefix": settings.api_v1_prefix,
"role": role,
}
@router.get(f"{settings.api_v1_prefix}/auth/me")
async def auth_me(role: str = Depends(require_api_key)):
return {
"role": role,
"auth_header": settings.auth_header_name,
}
@router.get(f"{settings.api_v1_prefix}/manifest", response_model=ServiceManifestResponse)
async def get_manifest(role: str = Depends(require_api_key)):
return ServiceManifestResponse(
service=settings.app_name,
api_prefix=settings.api_v1_prefix,
auth_header_name=settings.auth_header_name,
svg_limits={
"max_file_size_bytes": settings.svg_max_file_size_bytes,
"max_elements": settings.svg_max_elements,
},
sanitization={
"allow_internal_use_references_only": settings.svg_allow_internal_use_references_only,
"forbid_foreign_object_v1": settings.svg_forbid_foreign_object_v1,
"forbid_style_v1": settings.svg_forbid_style_v1,
"forbid_image_v1": settings.svg_forbid_image_v1,
"allowed_data_attributes": [
"data-seat-id",
"data-sector-id",
"data-group-id",
"data-row",
"data-seat-number",
],
},
extraction_contract={
"seat_fields": ["seat_id", "sector_id", "group_id", "row", "seat_number"],
"priority": [
"data-* attributes",
"inherited parent sector/group",
"fallback to element id",
],
},
)
@router.get(f"{settings.api_v1_prefix}/uploads", response_model=UploadListResponse)
async def get_uploads(
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
role: str = Depends(require_api_key),
):
rows = await list_upload_records(limit=limit, offset=offset)
total = await count_upload_records()
items = [
UploadListItem(
upload_id=row.upload_id,
original_filename=row.original_filename,
content_type=row.content_type,
size_bytes=row.size_bytes,
element_count=row.element_count,
removed_elements_count=row.removed_elements_count,
removed_attributes_count=row.removed_attributes_count,
normalized_elements_count=row.normalized_elements_count,
normalized_seats_count=row.normalized_seats_count,
normalized_groups_count=row.normalized_groups_count,
normalized_sectors_count=row.normalized_sectors_count,
original_storage_path=row.original_storage_path,
sanitized_storage_path=row.sanitized_storage_path,
normalized_storage_path=row.normalized_storage_path,
processing_status=row.processing_status,
created_at=row.created_at.isoformat(),
)
for row in rows
]
return UploadListResponse(items=items, total=total)
@router.get(f"{settings.api_v1_prefix}/uploads/{{upload_id}}", response_model=UploadDetailResponse)
async def get_upload(upload_id: str, role: str = Depends(require_api_key)):
row = await get_upload_record_by_upload_id(upload_id)
return UploadDetailResponse(
upload_id=row.upload_id,
original_filename=row.original_filename,
content_type=row.content_type,
size_bytes=row.size_bytes,
element_count=row.element_count,
removed_elements_count=row.removed_elements_count,
removed_attributes_count=row.removed_attributes_count,
normalized_elements_count=row.normalized_elements_count,
normalized_seats_count=row.normalized_seats_count,
normalized_groups_count=row.normalized_groups_count,
normalized_sectors_count=row.normalized_sectors_count,
original_storage_path=row.original_storage_path,
sanitized_storage_path=row.sanitized_storage_path,
normalized_storage_path=row.normalized_storage_path,
processing_status=row.processing_status,
created_at=row.created_at.isoformat(),
)
@router.get(f"{settings.api_v1_prefix}/uploads/{{upload_id}}/normalized")
async def get_normalized(upload_id: str, role: str = Depends(require_api_key)):
payload = load_normalized_json(upload_id)
return json.loads(payload)
@router.get(f"{settings.api_v1_prefix}/schemes", response_model=SchemeListResponse)
async def get_schemes(
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
role: str = Depends(require_api_key),
):
rows = await list_scheme_records(limit=limit, offset=offset)
total = await count_scheme_records()
items = [
SchemeListItem(
scheme_id=row.scheme_id,
source_upload_id=row.source_upload_id,
name=row.name,
status=row.status,
current_version_number=row.current_version_number,
published_at=row.published_at.isoformat() if row.published_at else None,
normalized_elements_count=row.normalized_elements_count,
normalized_seats_count=row.normalized_seats_count,
normalized_groups_count=row.normalized_groups_count,
normalized_sectors_count=row.normalized_sectors_count,
created_at=row.created_at.isoformat(),
)
for row in rows
]
return SchemeListResponse(items=items, total=total)
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}", response_model=SchemeDetailResponse)
async def get_scheme(scheme_id: str, role: str = Depends(require_api_key)):
row = await get_scheme_record_by_scheme_id(scheme_id)
return SchemeDetailResponse(
scheme_id=row.scheme_id,
source_upload_id=row.source_upload_id,
name=row.name,
status=row.status,
current_version_number=row.current_version_number,
published_at=row.published_at.isoformat() if row.published_at else None,
normalized_elements_count=row.normalized_elements_count,
normalized_seats_count=row.normalized_seats_count,
normalized_groups_count=row.normalized_groups_count,
normalized_sectors_count=row.normalized_sectors_count,
created_at=row.created_at.isoformat(),
)
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/current/svg")
async def get_scheme_current_svg(scheme_id: str, role: str = Depends(require_api_key)):
scheme = await get_scheme_record_by_scheme_id(scheme_id)
upload = await get_upload_record_by_upload_id(scheme.source_upload_id)
svg_path = Path(upload.sanitized_storage_path)
if not svg_path.exists() or not svg_path.is_file():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Current sanitized SVG not found",
)
filename = f"{scheme.name or scheme.scheme_id}.svg"
return FileResponse(
path=svg_path,
media_type="image/svg+xml",
filename=filename,
)
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/audit", response_model=SchemeAuditResponse)
async def get_scheme_audit(scheme_id: str, role: str = Depends(require_api_key)):
await get_scheme_record_by_scheme_id(scheme_id)
rows = await list_audit_events(scheme_id)
return SchemeAuditResponse(
items=[
AuditEventItem(
audit_event_id=row.audit_event_id,
scheme_id=row.scheme_id,
event_type=row.event_type,
object_type=row.object_type,
object_ref=row.object_ref,
details_json=row.details_json,
created_at=row.created_at.isoformat(),
)
for row in rows
],
total=len(rows),
)
@router.post(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/publish", response_model=SchemePublishResponse)
async def publish_scheme_endpoint(scheme_id: str, role: str = Depends(require_api_key)):
row = await publish_scheme(scheme_id)
await create_audit_event(
scheme_id=row.scheme_id,
event_type="scheme.published",
object_type="scheme",
object_ref=row.scheme_id,
details={
"current_version_number": row.current_version_number,
"status": row.status,
},
)
return SchemePublishResponse(
scheme_id=row.scheme_id,
status=row.status,
current_version_number=row.current_version_number,
published_at=row.published_at.isoformat() if row.published_at else None,
)
@router.post(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/unpublish", response_model=SchemePublishResponse)
async def unpublish_scheme_endpoint(scheme_id: str, role: str = Depends(require_api_key)):
row = await unpublish_scheme(scheme_id)
await create_audit_event(
scheme_id=row.scheme_id,
event_type="scheme.unpublished",
object_type="scheme",
object_ref=row.scheme_id,
details={
"current_version_number": row.current_version_number,
"status": row.status,
},
)
return SchemePublishResponse(
scheme_id=row.scheme_id,
status=row.status,
current_version_number=row.current_version_number,
published_at=row.published_at.isoformat() if row.published_at else None,
)
@router.post(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/rollback", response_model=SchemeRollbackResponse)
async def rollback_scheme_endpoint(
scheme_id: str,
payload: SchemeRollbackRequest,
role: str = Depends(require_api_key),
):
row = await rollback_scheme_to_version(
scheme_id=scheme_id,
target_version_number=payload.target_version_number,
)
await create_audit_event(
scheme_id=row.scheme_id,
event_type="scheme.rolled_back",
object_type="scheme_version",
object_ref=str(payload.target_version_number),
details={
"current_version_number": row.current_version_number,
"status": row.status,
},
)
return SchemeRollbackResponse(
scheme_id=row.scheme_id,
status=row.status,
current_version_number=row.current_version_number,
published_at=row.published_at.isoformat() if row.published_at else None,
)
@router.post(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/versions", response_model=SchemeVersionCreateResponse)
async def create_next_scheme_version_endpoint(
scheme_id: str,
role: str = Depends(require_api_key),
):
current_scheme = await get_scheme_record_by_scheme_id(scheme_id)
current_version = await get_current_scheme_version(
scheme_id=current_scheme.scheme_id,
current_version_number=current_scheme.current_version_number,
)
new_version = await create_next_scheme_version_from_current(scheme_id)
await clone_scheme_version_sectors(
source_scheme_version_id=current_version.scheme_version_id,
target_scheme_version_id=new_version.scheme_version_id,
)
await clone_scheme_version_groups(
source_scheme_version_id=current_version.scheme_version_id,
target_scheme_version_id=new_version.scheme_version_id,
)
await clone_scheme_version_seats(
source_scheme_version_id=current_version.scheme_version_id,
target_scheme_version_id=new_version.scheme_version_id,
)
await create_audit_event(
scheme_id=scheme_id,
event_type="scheme.version.created",
object_type="scheme_version",
object_ref=new_version.scheme_version_id,
details={
"source_scheme_version_id": current_version.scheme_version_id,
"version_number": new_version.version_number,
"normalized_storage_path": new_version.normalized_storage_path,
},
)
return SchemeVersionCreateResponse(
scheme_id=new_version.scheme_id,
scheme_version_id=new_version.scheme_version_id,
version_number=new_version.version_number,
status=new_version.status,
normalized_storage_path=new_version.normalized_storage_path,
)
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/current", response_model=SchemeCurrentResponse)
async def get_scheme_current(scheme_id: str, role: str = Depends(require_api_key)):
scheme = await get_scheme_record_by_scheme_id(scheme_id)
version = await get_current_scheme_version(
scheme_id=scheme.scheme_id,
current_version_number=scheme.current_version_number,
)
return SchemeCurrentResponse(
scheme_id=version.scheme_id,
scheme_version_id=version.scheme_version_id,
version_number=version.version_number,
status=version.status,
normalized_storage_path=version.normalized_storage_path,
normalized_elements_count=version.normalized_elements_count,
normalized_seats_count=version.normalized_seats_count,
normalized_groups_count=version.normalized_groups_count,
normalized_sectors_count=version.normalized_sectors_count,
created_at=version.created_at.isoformat(),
)
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/versions", response_model=SchemeVersionListResponse)
async def get_scheme_versions(
scheme_id: str,
limit: int = Query(default=100, ge=1, le=200),
offset: int = Query(default=0, ge=0),
role: str = Depends(require_api_key),
):
rows = await list_scheme_versions(scheme_id=scheme_id, limit=limit, offset=offset)
total = await count_scheme_versions(scheme_id=scheme_id)
items = [
SchemeVersionListItem(
scheme_version_id=row.scheme_version_id,
scheme_id=row.scheme_id,
version_number=row.version_number,
status=row.status,
normalized_storage_path=row.normalized_storage_path,
normalized_elements_count=row.normalized_elements_count,
normalized_seats_count=row.normalized_seats_count,
normalized_groups_count=row.normalized_groups_count,
normalized_sectors_count=row.normalized_sectors_count,
created_at=row.created_at.isoformat(),
)
for row in rows
]
return SchemeVersionListResponse(items=items, total=total)
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/current/sectors", response_model=SchemeSectorListResponse)
async def get_scheme_current_sectors(scheme_id: str, role: str = Depends(require_api_key)):
scheme = await get_scheme_record_by_scheme_id(scheme_id)
version = await get_current_scheme_version(
scheme_id=scheme.scheme_id,
current_version_number=scheme.current_version_number,
)
rows = await list_scheme_version_sectors(version.scheme_version_id)
items = [
SchemeSectorItem(
sector_record_id=row.sector_record_id,
scheme_id=row.scheme_id,
scheme_version_id=row.scheme_version_id,
element_id=row.element_id,
sector_id=row.sector_id,
name=row.name,
classes_raw=row.classes_raw,
created_at=row.created_at.isoformat(),
)
for row in rows
]
return SchemeSectorListResponse(items=items, total=len(items))
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/current/groups", response_model=SchemeGroupListResponse)
async def get_scheme_current_groups(scheme_id: str, role: str = Depends(require_api_key)):
scheme = await get_scheme_record_by_scheme_id(scheme_id)
version = await get_current_scheme_version(
scheme_id=scheme.scheme_id,
current_version_number=scheme.current_version_number,
)
rows = await list_scheme_version_groups(version.scheme_version_id)
items = [
SchemeGroupItem(
group_record_id=row.group_record_id,
scheme_id=row.scheme_id,
scheme_version_id=row.scheme_version_id,
element_id=row.element_id,
group_id=row.group_id,
name=row.name,
classes_raw=row.classes_raw,
created_at=row.created_at.isoformat(),
)
for row in rows
]
return SchemeGroupListResponse(items=items, total=len(items))
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/current/seats", response_model=SchemeSeatListResponse)
async def get_scheme_current_seats(scheme_id: str, role: str = Depends(require_api_key)):
scheme = await get_scheme_record_by_scheme_id(scheme_id)
version = await get_current_scheme_version(
scheme_id=scheme.scheme_id,
current_version_number=scheme.current_version_number,
)
rows = await list_scheme_version_seats(version.scheme_version_id)
items = [
SchemeSeatItem(
seat_record_id=row.seat_record_id,
scheme_id=row.scheme_id,
scheme_version_id=row.scheme_version_id,
element_id=row.element_id,
seat_id=row.seat_id,
sector_id=row.sector_id,
group_id=row.group_id,
row_label=row.row_label,
seat_number=row.seat_number,
tag=row.tag,
classes_raw=row.classes_raw,
x=row.x,
y=row.y,
cx=row.cx,
cy=row.cy,
width=row.width,
height=row.height,
created_at=row.created_at.isoformat(),
)
for row in rows
]
return SchemeSeatListResponse(items=items, total=len(items))
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/current/seats/{{seat_id}}/price", response_model=EffectiveSeatPriceResponse)
async def get_effective_seat_price(
scheme_id: str,
seat_id: str,
role: str = Depends(require_api_key),
):
scheme = await get_scheme_record_by_scheme_id(scheme_id)
version = await get_current_scheme_version(
scheme_id=scheme.scheme_id,
current_version_number=scheme.current_version_number,
)
seat = await get_scheme_version_seat_by_seat_id(
scheme_version_id=version.scheme_version_id,
seat_id=seat_id,
)
if not seat.seat_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Невозможно рассчитать цену: у места отсутствует seat_id",
)
matched_rule_level, rule = await find_effective_price_rule(
scheme_id=scheme.scheme_id,
seat_id=seat.seat_id,
group_id=seat.group_id,
sector_id=seat.sector_id,
)
return EffectiveSeatPriceResponse(
scheme_id=scheme.scheme_id,
scheme_version_id=version.scheme_version_id,
seat_id=seat.seat_id,
sector_id=seat.sector_id,
group_id=seat.group_id,
matched_rule_level=matched_rule_level,
matched_target_ref=rule["target_ref"],
pricing_category_id=rule["pricing_category_id"],
amount=rule["amount"],
currency=rule["currency"],
)
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/test/seats/{{seat_id}}", response_model=TestSeatPreviewResponse)
async def preview_test_seat(
scheme_id: str,
seat_id: str,
role: str = Depends(require_api_key),
):
scheme = await get_scheme_record_by_scheme_id(scheme_id)
version = await get_current_scheme_version(
scheme_id=scheme.scheme_id,
current_version_number=scheme.current_version_number,
)
seat = await get_scheme_version_seat_by_seat_id(
scheme_version_id=version.scheme_version_id,
seat_id=seat_id,
)
if not seat.seat_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Невозможно построить preview: у места отсутствует seat_id",
)
matched_rule_level = None
matched_target_ref = None
pricing_category_id = None
amount = None
currency = None
has_price = False
try:
matched_rule_level, rule = await find_effective_price_rule(
scheme_id=scheme.scheme_id,
seat_id=seat.seat_id,
group_id=seat.group_id,
sector_id=seat.sector_id,
)
matched_target_ref = rule["target_ref"]
pricing_category_id = rule["pricing_category_id"]
amount = rule["amount"]
currency = rule["currency"]
has_price = True
except HTTPException as exc:
if exc.status_code != status.HTTP_404_NOT_FOUND:
raise
except Exception as exc:
logger.exception(
"preview_test_seat failed for scheme_id=%s seat_id=%s",
scheme_id,
seat_id,
)
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Не удалось построить preview: {exc.__class__.__name__}: {exc}",
)
return TestSeatPreviewResponse(
scheme_id=scheme.scheme_id,
scheme_version_id=version.scheme_version_id,
seat_id=seat.seat_id,
element_id=seat.element_id,
sector_id=seat.sector_id,
group_id=seat.group_id,
row_label=seat.row_label,
seat_number=seat.seat_number,
selectable=has_price,
has_price=has_price,
matched_rule_level=matched_rule_level,
matched_target_ref=matched_target_ref,
pricing_category_id=pricing_category_id,
amount=amount,
currency=currency,
)
@router.post(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/pricing/categories", response_model=PricingCategoryCreateResponse)
async def create_pricing_category_endpoint(
scheme_id: str,
payload: PricingCategoryCreateRequest,
role: str = Depends(require_api_key),
):
await get_scheme_record_by_scheme_id(scheme_id)
pricing_category_id = await create_pricing_category(
scheme_id=scheme_id,
name=payload.name,
code=payload.code,
)
await create_audit_event(
scheme_id=scheme_id,
event_type="pricing.category.created",
object_type="pricing_category",
object_ref=pricing_category_id,
details={
"name": payload.name,
"code": payload.code,
},
)
return PricingCategoryCreateResponse(
pricing_category_id=pricing_category_id,
scheme_id=scheme_id,
name=payload.name,
code=payload.code,
)
@router.put(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/pricing/categories/{{pricing_category_id}}", response_model=PricingCategoryUpdateResponse)
async def update_pricing_category_endpoint(
scheme_id: str,
pricing_category_id: str,
payload: PricingCategoryUpdateRequest,
role: str = Depends(require_api_key),
):
row = await update_pricing_category(
scheme_id=scheme_id,
pricing_category_id=pricing_category_id,
name=payload.name,
code=payload.code,
)
await create_audit_event(
scheme_id=scheme_id,
event_type="pricing.category.updated",
object_type="pricing_category",
object_ref=pricing_category_id,
details={
"name": payload.name,
"code": payload.code,
},
)
return PricingCategoryUpdateResponse(
pricing_category_id=row.pricing_category_id,
scheme_id=row.scheme_id,
name=row.name,
code=row.code,
)
@router.delete(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/pricing/categories/{{pricing_category_id}}", response_model=DeleteResponse)
async def delete_pricing_category_endpoint(
scheme_id: str,
pricing_category_id: str,
role: str = Depends(require_api_key),
):
await delete_pricing_category(
scheme_id=scheme_id,
pricing_category_id=pricing_category_id,
)
await create_audit_event(
scheme_id=scheme_id,
event_type="pricing.category.deleted",
object_type="pricing_category",
object_ref=pricing_category_id,
details=None,
)
return DeleteResponse(status="deleted")
@router.post(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/pricing/rules", response_model=PriceRuleCreateResponse)
async def create_price_rule_endpoint(
scheme_id: str,
payload: PriceRuleCreateRequest,
role: str = Depends(require_api_key),
):
await get_scheme_record_by_scheme_id(scheme_id)
price_rule_id = await create_price_rule(
scheme_id=scheme_id,
pricing_category_id=payload.pricing_category_id,
target_type=payload.target_type,
target_ref=payload.target_ref,
amount=payload.amount,
currency=payload.currency,
)
await create_audit_event(
scheme_id=scheme_id,
event_type="pricing.rule.created",
object_type="price_rule",
object_ref=price_rule_id,
details={
"pricing_category_id": payload.pricing_category_id,
"target_type": payload.target_type,
"target_ref": payload.target_ref,
"amount": str(payload.amount),
"currency": payload.currency,
},
)
return PriceRuleCreateResponse(
price_rule_id=price_rule_id,
scheme_id=scheme_id,
pricing_category_id=payload.pricing_category_id,
target_type=payload.target_type,
target_ref=payload.target_ref,
amount=payload.amount,
currency=payload.currency,
)
@router.put(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/pricing/rules/{{price_rule_id}}", response_model=PriceRuleUpdateResponse)
async def update_price_rule_endpoint(
scheme_id: str,
price_rule_id: str,
payload: PriceRuleUpdateRequest,
role: str = Depends(require_api_key),
):
row = await update_price_rule(
scheme_id=scheme_id,
price_rule_id=price_rule_id,
pricing_category_id=payload.pricing_category_id,
target_type=payload.target_type,
target_ref=payload.target_ref,
amount=payload.amount,
currency=payload.currency,
)
await create_audit_event(
scheme_id=scheme_id,
event_type="pricing.rule.updated",
object_type="price_rule",
object_ref=price_rule_id,
details={
"pricing_category_id": payload.pricing_category_id,
"target_type": payload.target_type,
"target_ref": payload.target_ref,
"amount": str(payload.amount),
"currency": payload.currency,
},
)
return PriceRuleUpdateResponse(
price_rule_id=row.price_rule_id,
scheme_id=row.scheme_id,
pricing_category_id=row.pricing_category_id,
target_type=row.target_type,
target_ref=row.target_ref,
amount=row.amount,
currency=row.currency,
)
@router.delete(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/pricing/rules/{{price_rule_id}}", response_model=DeleteResponse)
async def delete_price_rule_endpoint(
scheme_id: str,
price_rule_id: str,
role: str = Depends(require_api_key),
):
await delete_price_rule(
scheme_id=scheme_id,
price_rule_id=price_rule_id,
)
await create_audit_event(
scheme_id=scheme_id,
event_type="pricing.rule.deleted",
object_type="price_rule",
object_ref=price_rule_id,
details=None,
)
return DeleteResponse(status="deleted")
@router.get(f"{settings.api_v1_prefix}/schemes/{{scheme_id}}/pricing", response_model=SchemePricingResponse)
async def get_scheme_pricing(
scheme_id: str,
role: str = Depends(require_api_key),
):
await get_scheme_record_by_scheme_id(scheme_id)
categories = await list_pricing_categories(scheme_id)
rules = await list_price_rules(scheme_id)
return SchemePricingResponse(
categories=[
PricingCategoryItem(
pricing_category_id=row.pricing_category_id,
scheme_id=row.scheme_id,
name=row.name,
code=row.code,
created_at=row.created_at.isoformat(),
)
for row in categories
],
rules=[
PriceRuleItem(
price_rule_id=row.price_rule_id,
scheme_id=row.scheme_id,
pricing_category_id=row.pricing_category_id,
target_type=row.target_type,
target_ref=row.target_ref,
amount=row.amount,
currency=row.currency,
created_at=row.created_at.isoformat(),
)
for row in rules
],
)
@router.post(f"{settings.api_v1_prefix}/schemes/upload", response_model=UploadResponse)
async def upload_scheme_svg(
file: UploadFile = File(...),
role: str = Depends(require_api_key),
):
filename = file.filename or ""
suffix = Path(filename).suffix.lower()
content_type = (file.content_type or "").lower()
if suffix != ".svg" and content_type != "image/svg+xml":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only SVG files are allowed",
)
content = await file.read()
size_bytes = len(content)
if size_bytes == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Uploaded file is empty",
)
if size_bytes > settings.svg_max_file_size_bytes:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="SVG file exceeds configured size limit",
)
element_count = inspect_svg_bytes(content)
sanitized_content, removed_elements_count, removed_attributes_count = sanitize_svg_bytes(content)
normalized_json, normalized_payload = normalize_svg_bytes_to_json(sanitized_content)
upload_id, original_storage_path = save_original_svg(filename=filename, content=content)
sanitized_storage_path = save_sanitized_svg(
upload_id=upload_id,
filename=filename,
content=sanitized_content,
)
normalized_storage_path = save_normalized_json(
upload_id=upload_id,
filename=filename,
content=normalized_json,
)
summary = normalized_payload["summary"]
await create_upload_record(
upload_id=upload_id,
original_filename=filename,
content_type=content_type,
size_bytes=size_bytes,
element_count=element_count,
removed_elements_count=removed_elements_count,
removed_attributes_count=removed_attributes_count,
normalized_elements_count=summary["elements_count"],
normalized_seats_count=summary["seats_count"],
normalized_groups_count=summary["groups_count"],
normalized_sectors_count=summary["sectors_count"],
original_storage_path=original_storage_path,
sanitized_storage_path=sanitized_storage_path,
normalized_storage_path=normalized_storage_path,
processing_status="completed",
)
scheme_id = await create_scheme_from_upload(
source_upload_id=upload_id,
name=Path(filename).stem or filename,
normalized_elements_count=summary["elements_count"],
normalized_seats_count=summary["seats_count"],
normalized_groups_count=summary["groups_count"],
normalized_sectors_count=summary["sectors_count"],
)
scheme_version_id = await create_initial_scheme_version(
scheme_id=scheme_id,
normalized_storage_path=normalized_storage_path,
normalized_elements_count=summary["elements_count"],
normalized_seats_count=summary["seats_count"],
normalized_groups_count=summary["groups_count"],
normalized_sectors_count=summary["sectors_count"],
)
normalized_payload_from_file = read_normalized_payload_from_path(normalized_storage_path)
await replace_scheme_version_sectors(
scheme_id=scheme_id,
scheme_version_id=scheme_version_id,
sectors=normalized_payload_from_file.get("sectors", []),
)
await replace_scheme_version_groups(
scheme_id=scheme_id,
scheme_version_id=scheme_version_id,
groups=normalized_payload_from_file.get("groups", []),
)
await replace_scheme_version_seats(
scheme_id=scheme_id,
scheme_version_id=scheme_version_id,
seats=normalized_payload_from_file.get("seats", []),
)
await create_audit_event(
scheme_id=scheme_id,
event_type="scheme.created_from_upload",
object_type="upload",
object_ref=upload_id,
details={
"scheme_version_id": scheme_version_id,
"filename": filename,
"normalized_elements_count": summary["elements_count"],
"normalized_seats_count": summary["seats_count"],
"normalized_groups_count": summary["groups_count"],
"normalized_sectors_count": summary["sectors_count"],
},
)
return UploadResponse(
upload_id=upload_id,
filename=filename,
content_type=content_type,
size_bytes=size_bytes,
element_count=element_count,
removed_elements_count=removed_elements_count,
removed_attributes_count=removed_attributes_count,
normalized_elements_count=summary["elements_count"],
normalized_seats_count=summary["seats_count"],
normalized_groups_count=summary["groups_count"],
normalized_sectors_count=summary["sectors_count"],
svg_max_file_size_bytes=settings.svg_max_file_size_bytes,
svg_max_elements=settings.svg_max_elements,
original_storage_path=original_storage_path,
sanitized_storage_path=sanitized_storage_path,
normalized_storage_path=normalized_storage_path,
accepted=True,
)