Initial commit: svg backend

This commit is contained in:
adminko
2026-03-19 13:39:32 +03:00
commit 85fb2f4bb9
78 changed files with 6161 additions and 0 deletions

View File

@@ -0,0 +1,16 @@
import json
from pathlib import Path
from fastapi import HTTPException, status
def read_normalized_payload_from_path(normalized_storage_path: str) -> dict:
path = Path(normalized_storage_path)
if not path.exists() or not path.is_file():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Normalized snapshot file not found",
)
return json.loads(path.read_text(encoding="utf-8"))

View File

@@ -0,0 +1,49 @@
from __future__ import annotations
from pathlib import Path
from uuid import uuid4
from app.core.config import settings
def _ensure_dir(path: str) -> Path:
dir_path = Path(path)
dir_path.mkdir(parents=True, exist_ok=True)
return dir_path
def save_original_svg(*, filename: str, content: bytes) -> tuple[str, str]:
upload_id = uuid4().hex
target_dir = _ensure_dir(f"{settings.storage_original_dir}/{upload_id}")
target_path = target_dir / filename
target_path.write_bytes(content)
return upload_id, str(target_path)
def save_sanitized_svg(*, upload_id: str, filename: str, content: bytes) -> str:
target_dir = _ensure_dir(f"{settings.storage_sanitized_dir}/{upload_id}")
target_path = target_dir / filename
target_path.write_bytes(content)
return str(target_path)
def save_normalized_json(*, upload_id: str, filename: str, content: str) -> str:
target_dir = _ensure_dir(f"{settings.storage_normalized_dir}/{upload_id}")
target_path = target_dir / f"{Path(filename).stem}.normalized.json"
target_path.write_text(content, encoding="utf-8")
return str(target_path)
def save_display_svg(*, upload_id: str, filename: str, content: bytes) -> str:
target_dir = _ensure_dir(f"{settings.storage_display_dir}/{upload_id}")
target_path = target_dir / f"{Path(filename).stem}.display.svg"
target_path.write_bytes(content)
return str(target_path)
def load_normalized_json(upload_id: str) -> str:
target_dir = Path(f"{settings.storage_normalized_dir}/{upload_id}")
files = sorted(target_dir.glob("*.normalized.json"))
if not files:
raise FileNotFoundError(f"Normalized payload not found for upload_id={upload_id}")
return files[-1].read_text(encoding="utf-8")

View File

@@ -0,0 +1,187 @@
from __future__ import annotations
import logging
import re
from typing import Any
from lxml import etree
from app.core.config import settings
logger = logging.getLogger(__name__)
ALLOWED_MODES = {"passthrough", "optimized"}
def _parse_length(value: str | None) -> float | None:
if not value:
return None
cleaned = value.strip().replace("px", "")
try:
return float(cleaned)
except ValueError:
return None
def _local_name(tag: str) -> str:
if "}" in tag:
return tag.split("}", 1)[1]
return tag
def _is_hidden(node: etree._Element) -> bool:
display = (node.attrib.get("display") or "").strip().lower()
visibility = (node.attrib.get("visibility") or "").strip().lower()
style = (node.attrib.get("style") or "").replace(" ", "").lower()
return (
display == "none"
or visibility == "hidden"
or "display:none" in style
or "visibility:hidden" in style
)
def _is_seat_related(node: etree._Element) -> bool:
probe = " ".join(
[
node.attrib.get("id", ""),
node.attrib.get("class", ""),
node.attrib.get("data-seat-id", ""),
node.attrib.get("data-sector-id", ""),
node.attrib.get("data-group-id", ""),
]
).lower()
return any(token in probe for token in ["seat", "sector", "group", "place"])
def _font_size(node: etree._Element) -> float | None:
direct = _parse_length(node.attrib.get("font-size"))
if direct is not None:
return direct
style = node.attrib.get("style") or ""
match = re.search(r"font-size\s*:\s*([0-9.]+)", style, flags=re.IGNORECASE)
if match:
return _parse_length(match.group(1))
return None
def _is_technical_text(node: etree._Element) -> bool:
patterns = [
item.strip().lower()
for item in settings.svg_display_technical_text_patterns.split(",")
if item.strip()
]
haystack = " ".join(
[
node.attrib.get("id", ""),
node.attrib.get("class", ""),
"".join(node.itertext()),
]
).lower()
return any(pattern in haystack for pattern in patterns)
def _force_viewbox(root: etree._Element) -> None:
if not settings.svg_display_force_viewbox:
return
if root.attrib.get("viewBox"):
return
width = _parse_length(root.attrib.get("width"))
height = _parse_length(root.attrib.get("height"))
if width and height:
w = int(width) if width.is_integer() else width
h = int(height) if height.is_integer() else height
root.attrib["viewBox"] = f"0 0 {w} {h}"
def _extract_meta(root: etree._Element) -> dict[str, Any]:
return {
"view_box": root.attrib.get("viewBox"),
"width": root.attrib.get("width"),
"height": root.attrib.get("height"),
}
def generate_display_svg(content: bytes, mode: str) -> tuple[bytes, dict[str, Any]]:
if mode not in ALLOWED_MODES:
raise ValueError(f"Unsupported display mode: {mode}")
parser = etree.XMLParser(
resolve_entities=False,
remove_blank_text=False,
remove_comments=False,
no_network=True,
recover=False,
huge_tree=True,
)
root = etree.fromstring(content, parser=parser)
defs_count = len(root.xpath("//*[local-name()='defs']"))
use_count = len(root.xpath("//*[local-name()='use']"))
style_count = len(root.xpath("//*[local-name()='style']"))
clip_count = len(root.xpath("//*[local-name()='clipPath']"))
logger.info(
"display_svg.generate mode=%s size_bytes=%s has_style=%s defs=%s use=%s clipPath=%s",
mode,
len(content),
bool(style_count),
defs_count,
use_count,
clip_count,
)
removed_hidden_count = 0
removed_small_text_count = 0
removed_technical_text_count = 0
if mode == "optimized":
for node in list(root.iter()):
tag_name = _local_name(node.tag)
if settings.svg_display_remove_hidden_elements and not _is_seat_related(node) and _is_hidden(node):
parent = node.getparent()
if parent is not None:
parent.remove(node)
removed_hidden_count += 1
continue
if tag_name in {"text", "tspan"}:
if settings.svg_display_hide_small_text and not _is_seat_related(node):
size = _font_size(node)
if size is not None and size < settings.svg_display_min_text_font_size:
parent = node.getparent()
if parent is not None:
parent.remove(node)
removed_small_text_count += 1
continue
if settings.svg_display_hide_technical_text and not _is_seat_related(node) and _is_technical_text(node):
parent = node.getparent()
if parent is not None:
parent.remove(node)
removed_technical_text_count += 1
continue
_force_viewbox(root)
output = etree.tostring(
root,
encoding="utf-8",
xml_declaration=True,
pretty_print=False,
)
meta = _extract_meta(root)
meta.update(
{
"mode": mode,
"removed_hidden_count": removed_hidden_count,
"removed_small_text_count": removed_small_text_count,
"removed_technical_text_count": removed_technical_text_count,
}
)
return output, meta

View File

@@ -0,0 +1,31 @@
from defusedxml import ElementTree as DefusedET
from fastapi import HTTPException, status
from app.core.config import settings
def inspect_svg_bytes(content: bytes) -> int:
try:
root = DefusedET.fromstring(content)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid SVG XML: {exc.__class__.__name__}",
) from exc
tag = root.tag or ""
if not tag.endswith("svg"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Root element is not <svg>",
)
element_count = sum(1 for _ in root.iter())
if element_count > settings.svg_max_elements:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="SVG element count exceeds configured limit",
)
return element_count

View File

@@ -0,0 +1,200 @@
import json
import re
from typing import Any
from xml.etree import ElementTree as StdET
SHAPE_TAGS = {"rect", "circle", "ellipse", "path", "polygon", "polyline", "line"}
CONTAINER_TAGS = {"g"}
TEXT_TAGS = {"text", "tspan"}
def _local_name(tag: str) -> str:
if "}" in tag:
return tag.split("}", 1)[1]
return tag
def _parse_classes(value: str | None) -> list[str]:
if not value:
return []
return [item for item in value.strip().split() if item]
def _to_float(value: str | None) -> float | None:
if value is None or value == "":
return None
try:
return float(value)
except ValueError:
return None
def _infer_kind(element_id: str | None, classes: list[str], tag: str) -> str:
haystack = " ".join([element_id or ""] + classes).lower()
if "seat" in haystack or "place" in haystack:
return "seat"
if "sector" in haystack or "zone" in haystack:
return "sector"
if "group" in haystack:
return "group"
if tag in SHAPE_TAGS:
return "shape"
if tag in CONTAINER_TAGS:
return "container"
if tag in TEXT_TAGS:
return "text"
return "other"
def _extract_prefixed_id(value: str | None, prefix: str) -> str | None:
if not value:
return None
low = value.lower()
pref = f"{prefix}-"
if low.startswith(pref):
return value[len(pref):]
return None
def _extract_seat_parts_from_id(value: str | None) -> tuple[str | None, str | None]:
if not value:
return None, None
patterns = [
r"^seat[-_]?([a-zA-Z]+)[-_]?(\d+)$",
r"^place[-_]?([a-zA-Z]+)[-_]?(\d+)$",
r"^([a-zA-Z]+)[-_]?(\d+)$",
]
for pattern in patterns:
match = re.match(pattern, value)
if match:
return match.group(1), match.group(2)
return None, None
def _build_parent_map(root) -> dict[int, dict[str, str | None]]:
parent_map: dict[int, dict[str, str | None]] = {}
def walk(node, current_sector_id: str | None, current_group_id: str | None):
node_id = node.attrib.get("id")
classes = _parse_classes(node.attrib.get("class"))
kind = _infer_kind(node_id, classes, _local_name(node.tag))
sector_id = current_sector_id
group_id = current_group_id
explicit_sector = node.attrib.get("data-sector-id") or _extract_prefixed_id(node_id, "sector")
explicit_group = node.attrib.get("data-group-id") or _extract_prefixed_id(node_id, "group")
if kind == "sector":
sector_id = explicit_sector or node_id or sector_id
if kind == "group":
group_id = explicit_group or node_id or group_id
parent_map[id(node)] = {
"sector_id": sector_id,
"group_id": group_id,
}
for child in list(node):
walk(child, sector_id, group_id)
walk(root, None, None)
return parent_map
def normalize_svg_bytes(content: bytes) -> dict[str, Any]:
root = StdET.fromstring(content)
parent_map = _build_parent_map(root)
elements: list[dict[str, Any]] = []
seats: list[dict[str, Any]] = []
groups: list[dict[str, Any]] = []
sectors: list[dict[str, Any]] = []
for node in root.iter():
tag = _local_name(node.tag)
if tag == "svg":
continue
element_id = node.attrib.get("id")
classes = _parse_classes(node.attrib.get("class"))
kind = _infer_kind(element_id=element_id, classes=classes, tag=tag)
inherited = parent_map.get(id(node), {})
inherited_sector_id = inherited.get("sector_id")
inherited_group_id = inherited.get("group_id")
explicit_sector_id = node.attrib.get("data-sector-id")
explicit_group_id = node.attrib.get("data-group-id")
explicit_seat_id = node.attrib.get("data-seat-id")
explicit_row = node.attrib.get("data-row")
explicit_seat_number = node.attrib.get("data-seat-number")
row_from_id, seat_number_from_id = _extract_seat_parts_from_id(element_id)
seat_id = explicit_seat_id or (element_id if kind == "seat" else None)
sector_id = explicit_sector_id or inherited_sector_id
group_id = explicit_group_id or inherited_group_id
row = explicit_row or row_from_id
seat_number = explicit_seat_number or seat_number_from_id
item = {
"id": element_id,
"tag": tag,
"kind": kind,
"classes": classes,
"x": _to_float(node.attrib.get("x")),
"y": _to_float(node.attrib.get("y")),
"cx": _to_float(node.attrib.get("cx")),
"cy": _to_float(node.attrib.get("cy")),
"width": _to_float(node.attrib.get("width")),
"height": _to_float(node.attrib.get("height")),
"href": node.attrib.get("href") or node.attrib.get("{http://www.w3.org/1999/xlink}href"),
"seat_id": seat_id,
"sector_id": sector_id,
"group_id": group_id,
"row": row,
"seat_number": seat_number,
}
elements.append(item)
if kind == "seat":
seats.append(item)
elif kind == "group":
groups.append(item)
elif kind == "sector":
sectors.append(item)
return {
"summary": {
"elements_count": len(elements),
"seats_count": len(seats),
"groups_count": len(groups),
"sectors_count": len(sectors),
},
"contract": {
"seat_fields": ["seat_id", "sector_id", "group_id", "row", "seat_number"],
"priority": [
"data-* attributes",
"inherited parent sector/group",
"fallback to element id",
],
},
"elements": elements,
"seats": seats,
"groups": groups,
"sectors": sectors,
}
def normalize_svg_bytes_to_json(content: bytes) -> tuple[str, dict[str, Any]]:
payload = normalize_svg_bytes(content)
return json.dumps(payload, ensure_ascii=False, indent=2), payload

View File

@@ -0,0 +1,99 @@
from __future__ import annotations
from lxml import etree
from app.core.config import settings
DANGEROUS_TAGS = {"script"}
SVG_NS = "http://www.w3.org/2000/svg"
XLINK_NS = "http://www.w3.org/1999/xlink"
XLINK_HREF = f"{{{XLINK_NS}}}href"
def _local_name(tag: str) -> str:
if "}" in tag:
return tag.split("}", 1)[1]
return tag
def _is_external_ref(value: str) -> bool:
low = value.strip().lower()
return (
low.startswith("http://")
or low.startswith("https://")
or low.startswith("file:")
or low.startswith("javascript:")
or low.startswith("data:")
or low.startswith("//")
)
def sanitize_svg_bytes(content: bytes) -> tuple[bytes, int, int]:
parser = etree.XMLParser(
resolve_entities=False,
remove_blank_text=False,
remove_comments=False,
no_network=True,
recover=False,
huge_tree=True,
)
root = etree.fromstring(content, parser=parser)
removed_elements_count = 0
removed_attributes_count = 0
for node in list(root.iter()):
tag_name = _local_name(node.tag)
if tag_name in DANGEROUS_TAGS:
parent = node.getparent()
if parent is not None:
parent.remove(node)
removed_elements_count += 1
continue
if settings.svg_forbid_foreign_object_v1 and tag_name == "foreignObject":
parent = node.getparent()
if parent is not None:
parent.remove(node)
removed_elements_count += 1
continue
if settings.svg_forbid_image_v1 and tag_name == "image":
href = node.attrib.get("href") or node.attrib.get(XLINK_HREF)
if href and _is_external_ref(href):
parent = node.getparent()
if parent is not None:
parent.remove(node)
removed_elements_count += 1
continue
for attr_name in list(node.attrib.keys()):
local_attr = _local_name(attr_name).lower()
value = node.attrib.get(attr_name) or ""
if local_attr.startswith("on"):
del node.attrib[attr_name]
removed_attributes_count += 1
continue
if local_attr in {"href"}:
if value and not value.startswith("#") and _is_external_ref(value):
del node.attrib[attr_name]
removed_attributes_count += 1
continue
if attr_name == XLINK_HREF:
if value and not value.startswith("#") and _is_external_ref(value):
del node.attrib[attr_name]
removed_attributes_count += 1
continue
sanitized = etree.tostring(
root,
encoding="utf-8",
xml_declaration=True,
pretty_print=False,
)
return sanitized, removed_elements_count, removed_attributes_count