103 lines
3.1 KiB
Python
103 lines
3.1 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
from app.core.config import settings
|
|
from app.repositories.scheme_artifacts import (
|
|
delete_scheme_artifacts_by_artifact_ids,
|
|
list_artifacts_by_type,
|
|
)
|
|
|
|
PUBLISH_PREVIEW_ARTIFACT_TYPE = "publish_preview"
|
|
|
|
|
|
def _preview_root() -> Path:
|
|
path = Path(settings.storage_preview_dir)
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def _cleanup_file_and_empty_parent(path: Path) -> bool:
|
|
removed = False
|
|
|
|
try:
|
|
if path.exists() and path.is_file():
|
|
path.unlink()
|
|
removed = True
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
try:
|
|
parent = path.parent
|
|
root = _preview_root()
|
|
if parent != root and parent.exists():
|
|
parent.rmdir()
|
|
except OSError:
|
|
pass
|
|
|
|
return removed
|
|
|
|
|
|
async def inspect_publish_preview_storage() -> dict:
|
|
rows = await list_artifacts_by_type(artifact_type=PUBLISH_PREVIEW_ARTIFACT_TYPE)
|
|
root = _preview_root()
|
|
|
|
db_by_path: dict[str, object] = {row.storage_path: row for row in rows}
|
|
disk_files = sorted(str(path) for path in root.glob("*/*.json"))
|
|
|
|
orphan_files = [path for path in disk_files if path not in db_by_path]
|
|
missing_files_for_db_rows = [
|
|
{
|
|
"artifact_id": row.artifact_id,
|
|
"scheme_id": row.scheme_id,
|
|
"scheme_version_id": row.scheme_version_id,
|
|
"artifact_variant": row.artifact_variant,
|
|
"storage_path": row.storage_path,
|
|
}
|
|
for row in rows
|
|
if not Path(row.storage_path).exists()
|
|
]
|
|
|
|
return {
|
|
"artifact_type": PUBLISH_PREVIEW_ARTIFACT_TYPE,
|
|
"db_rows_count": len(rows),
|
|
"disk_files_count": len(disk_files),
|
|
"orphan_files_count": len(orphan_files),
|
|
"missing_files_for_db_rows_count": len(missing_files_for_db_rows),
|
|
"orphan_files": orphan_files,
|
|
"missing_files_for_db_rows": missing_files_for_db_rows,
|
|
}
|
|
|
|
|
|
async def cleanup_publish_preview_storage(*, dry_run: bool = True) -> dict:
|
|
audit = await inspect_publish_preview_storage()
|
|
|
|
orphan_files = [Path(path) for path in audit["orphan_files"]]
|
|
missing_rows = audit["missing_files_for_db_rows"]
|
|
|
|
deleted_files: list[str] = []
|
|
deleted_db_artifact_ids: list[str] = []
|
|
|
|
if not dry_run:
|
|
for path in orphan_files:
|
|
if _cleanup_file_and_empty_parent(path):
|
|
deleted_files.append(str(path))
|
|
|
|
deleted_db_artifact_ids = [
|
|
item["artifact_id"] for item in missing_rows
|
|
]
|
|
await delete_scheme_artifacts_by_artifact_ids(deleted_db_artifact_ids)
|
|
|
|
return {
|
|
"artifact_type": PUBLISH_PREVIEW_ARTIFACT_TYPE,
|
|
"dry_run": dry_run,
|
|
"orphan_files_count": len(orphan_files),
|
|
"missing_files_for_db_rows_count": len(missing_rows),
|
|
"orphan_files": [str(path) for path in orphan_files],
|
|
"missing_files_for_db_rows": missing_rows,
|
|
"deleted_files_count": len(deleted_files),
|
|
"deleted_files": deleted_files,
|
|
"deleted_db_rows_count": len(deleted_db_artifact_ids),
|
|
"deleted_db_artifact_ids": deleted_db_artifact_ids,
|
|
}
|