Files
svg-backend/backend/app/services/publish_preview_cache.py

129 lines
3.4 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from uuid import uuid4
from app.core.config import settings
from app.repositories.scheme_artifacts import (
create_scheme_artifact,
delete_scheme_artifacts_by_artifact_ids,
list_scheme_artifacts,
)
def _preview_storage_dir() -> Path:
path = Path(settings.storage_preview_dir)
path.mkdir(parents=True, exist_ok=True)
return path
def _cleanup_preview_file(storage_path: str) -> None:
path = Path(storage_path)
try:
if path.exists() and path.is_file():
path.unlink()
except FileNotFoundError:
pass
parent = path.parent
preview_root = Path(settings.storage_preview_dir)
try:
if parent != preview_root and parent.exists():
parent.rmdir()
except OSError:
pass
async def cleanup_publish_preview_artifacts(
*,
scheme_version_id: str,
baseline_scheme_version_id: str | None,
) -> dict:
retention = max(1, settings.publish_preview_retention_per_variant)
variant = baseline_scheme_version_id or "default"
rows = await list_scheme_artifacts(
scheme_version_id=scheme_version_id,
artifact_type="publish_preview",
artifact_variant=variant,
)
if len(rows) <= retention:
return {
"retention": retention,
"deleted_count": 0,
"deleted_artifact_ids": [],
}
rows_sorted = sorted(
rows,
key=lambda row: (row.created_at, row.id),
reverse=True,
)
to_delete = rows_sorted[retention:]
deleted_artifact_ids = [row.artifact_id for row in to_delete]
for row in to_delete:
_cleanup_preview_file(row.storage_path)
deleted_count = await delete_scheme_artifacts_by_artifact_ids(deleted_artifact_ids)
return {
"retention": retention,
"deleted_count": deleted_count,
"deleted_artifact_ids": deleted_artifact_ids,
}
async def save_publish_preview_artifact(
*,
scheme_id: str,
scheme_version_id: str,
payload: dict,
baseline_scheme_version_id: str | None,
) -> dict:
artifact_dir = _preview_storage_dir() / uuid4().hex
artifact_dir.mkdir(parents=True, exist_ok=True)
storage_path = artifact_dir / "publish-preview.json"
storage_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
artifact = await create_scheme_artifact(
scheme_id=scheme_id,
scheme_version_id=scheme_version_id,
artifact_type="publish_preview",
artifact_variant=baseline_scheme_version_id or "default",
storage_path=str(storage_path),
meta_json={
"baseline_scheme_version_id": baseline_scheme_version_id,
"summary": payload.get("summary"),
},
)
cleanup = await cleanup_publish_preview_artifacts(
scheme_version_id=scheme_version_id,
baseline_scheme_version_id=baseline_scheme_version_id,
)
return {
"artifact": artifact,
"cleanup": cleanup,
}
async def get_latest_publish_preview_artifact(
*,
scheme_version_id: str,
baseline_scheme_version_id: str | None,
):
rows = await list_scheme_artifacts(
scheme_version_id=scheme_version_id,
artifact_type="publish_preview",
artifact_variant=baseline_scheme_version_id or "default",
)
if not rows:
return None
rows.sort(key=lambda row: (row.created_at, row.id), reverse=True)
return rows[0]