from __future__ import annotations import json from pathlib import Path from uuid import uuid4 from app.core.config import settings from app.repositories.scheme_artifacts import ( create_scheme_artifact, delete_scheme_artifacts_by_artifact_ids, list_scheme_artifacts, ) def _preview_storage_dir() -> Path: path = Path(settings.storage_preview_dir) path.mkdir(parents=True, exist_ok=True) return path def _cleanup_preview_file(storage_path: str) -> None: path = Path(storage_path) try: if path.exists() and path.is_file(): path.unlink() except FileNotFoundError: pass parent = path.parent preview_root = Path(settings.storage_preview_dir) try: if parent != preview_root and parent.exists(): parent.rmdir() except OSError: pass async def cleanup_publish_preview_artifacts( *, scheme_version_id: str, baseline_scheme_version_id: str | None, ) -> dict: retention = max(1, settings.publish_preview_retention_per_variant) variant = baseline_scheme_version_id or "default" rows = await list_scheme_artifacts( scheme_version_id=scheme_version_id, artifact_type="publish_preview", artifact_variant=variant, ) if len(rows) <= retention: return { "retention": retention, "deleted_count": 0, "deleted_artifact_ids": [], } rows_sorted = sorted( rows, key=lambda row: (row.created_at, row.id), reverse=True, ) to_delete = rows_sorted[retention:] deleted_artifact_ids = [row.artifact_id for row in to_delete] for row in to_delete: _cleanup_preview_file(row.storage_path) deleted_count = await delete_scheme_artifacts_by_artifact_ids(deleted_artifact_ids) return { "retention": retention, "deleted_count": deleted_count, "deleted_artifact_ids": deleted_artifact_ids, } async def save_publish_preview_artifact( *, scheme_id: str, scheme_version_id: str, payload: dict, baseline_scheme_version_id: str | None, ) -> dict: artifact_dir = _preview_storage_dir() / uuid4().hex artifact_dir.mkdir(parents=True, exist_ok=True) storage_path = artifact_dir / "publish-preview.json" storage_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") artifact = await create_scheme_artifact( scheme_id=scheme_id, scheme_version_id=scheme_version_id, artifact_type="publish_preview", artifact_variant=baseline_scheme_version_id or "default", storage_path=str(storage_path), meta_json={ "baseline_scheme_version_id": baseline_scheme_version_id, "summary": payload.get("summary"), }, ) cleanup = await cleanup_publish_preview_artifacts( scheme_version_id=scheme_version_id, baseline_scheme_version_id=baseline_scheme_version_id, ) return { "artifact": artifact, "cleanup": cleanup, } async def get_latest_publish_preview_artifact( *, scheme_version_id: str, baseline_scheme_version_id: str | None, ): rows = await list_scheme_artifacts( scheme_version_id=scheme_version_id, artifact_type="publish_preview", artifact_variant=baseline_scheme_version_id or "default", ) if not rows: return None rows.sort(key=lambda row: (row.created_at, row.id), reverse=True) return rows[0]