Files
svg-backend/backend/app/repositories/scheme_artifacts.py

137 lines
4.1 KiB
Python

from uuid import uuid4
from sqlalchemy import asc, delete, desc, select
from app.db.session import AsyncSessionLocal
from app.models.scheme_artifact import SchemeArtifactRecord
async def create_scheme_artifact(
*,
scheme_id: str,
scheme_version_id: str,
artifact_type: str,
artifact_variant: str,
storage_path: str,
status: str = "ready",
meta_json: dict | None = None,
) -> SchemeArtifactRecord:
async with AsyncSessionLocal() as session:
row = SchemeArtifactRecord(
artifact_id=uuid4().hex,
scheme_id=scheme_id,
scheme_version_id=scheme_version_id,
artifact_type=artifact_type,
artifact_variant=artifact_variant,
storage_path=storage_path,
status=status,
meta_json=meta_json,
)
session.add(row)
await session.commit()
await session.refresh(row)
return row
async def list_scheme_artifacts(
*,
scheme_version_id: str,
artifact_type: str | None = None,
artifact_variant: str | None = None,
) -> list[SchemeArtifactRecord]:
async with AsyncSessionLocal() as session:
stmt = select(SchemeArtifactRecord).where(
SchemeArtifactRecord.scheme_version_id == scheme_version_id
)
if artifact_type is not None:
stmt = stmt.where(SchemeArtifactRecord.artifact_type == artifact_type)
if artifact_variant is not None:
stmt = stmt.where(SchemeArtifactRecord.artifact_variant == artifact_variant)
stmt = stmt.order_by(
asc(SchemeArtifactRecord.created_at),
asc(SchemeArtifactRecord.id),
)
result = await session.execute(stmt)
return list(result.scalars().all())
async def list_artifacts_by_type(
*,
artifact_type: str,
artifact_variant: str | None = None,
) -> list[SchemeArtifactRecord]:
async with AsyncSessionLocal() as session:
stmt = select(SchemeArtifactRecord).where(
SchemeArtifactRecord.artifact_type == artifact_type
)
if artifact_variant is not None:
stmt = stmt.where(SchemeArtifactRecord.artifact_variant == artifact_variant)
stmt = stmt.order_by(
asc(SchemeArtifactRecord.created_at),
asc(SchemeArtifactRecord.id),
)
result = await session.execute(stmt)
return list(result.scalars().all())
async def artifact_exists(
*,
scheme_version_id: str,
artifact_type: str,
artifact_variant: str,
) -> bool:
async with AsyncSessionLocal() as session:
stmt = (
select(SchemeArtifactRecord.id)
.where(SchemeArtifactRecord.scheme_version_id == scheme_version_id)
.where(SchemeArtifactRecord.artifact_type == artifact_type)
.where(SchemeArtifactRecord.artifact_variant == artifact_variant)
.limit(1)
)
result = await session.execute(stmt)
return result.scalar_one_or_none() is not None
async def get_latest_scheme_artifact(
*,
scheme_version_id: str,
artifact_type: str,
artifact_variant: str | None = None,
) -> SchemeArtifactRecord | None:
async with AsyncSessionLocal() as session:
stmt = select(SchemeArtifactRecord).where(
SchemeArtifactRecord.scheme_version_id == scheme_version_id,
SchemeArtifactRecord.artifact_type == artifact_type,
)
if artifact_variant is not None:
stmt = stmt.where(SchemeArtifactRecord.artifact_variant == artifact_variant)
stmt = stmt.order_by(
desc(SchemeArtifactRecord.created_at),
desc(SchemeArtifactRecord.id),
).limit(1)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def delete_scheme_artifacts_by_artifact_ids(artifact_ids: list[str]) -> int:
if not artifact_ids:
return 0
async with AsyncSessionLocal() as session:
stmt = delete(SchemeArtifactRecord).where(
SchemeArtifactRecord.artifact_id.in_(artifact_ids)
)
result = await session.execute(stmt)
await session.commit()
return int(result.rowcount or 0)