Add Bambuddy InvenTree sync service

This commit is contained in:
2026-04-15 13:45:55 +03:00
commit 9f3d825120
16 changed files with 1044 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
__all__ = ["__version__"]
__version__ = "0.1.0"

View File

@@ -0,0 +1,61 @@
from typing import Any
import httpx
from .config import Settings
from .http_errors import ExternalApiError
from .models import Archive
class BambuddyClient:
def __init__(self, settings: Settings) -> None:
base_url = settings.bambuddy_base_url.rstrip("/")
if not base_url.endswith("/api/v1"):
base_url = f"{base_url}/api/v1"
self.base_url = base_url
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers={"X-API-Key": settings.bambuddy_api_key, "Accept": "application/json"},
timeout=settings.http_timeout_seconds,
trust_env=False,
)
async def close(self) -> None:
await self.client.aclose()
async def get_archive(self, archive_id: int) -> Archive:
data = await self._request("GET", f"/archives/{archive_id}")
return Archive.model_validate(data)
async def get_system_info(self) -> dict[str, Any]:
return await self._request("GET", "/system/info")
async def list_archives(
self,
*,
status: str | None,
limit: int,
offset: int,
) -> tuple[int | None, list[Archive]]:
params: dict[str, Any] = {"limit": limit, "offset": offset}
if status:
params["status"] = status
data = await self._request("GET", "/archives/", params=params)
if isinstance(data, list):
return None, [Archive.model_validate(item) for item in data]
archives = data.get("archives") or data.get("results") or []
total = data.get("total") or data.get("count")
return total, [Archive.model_validate(item) for item in archives]
async def _request(self, method: str, path: str, **kwargs: Any) -> Any:
response = await self.client.request(method, path, **kwargs)
if response.status_code >= 400:
body = response.text[:1000]
raise ExternalApiError(f"Bambuddy {method} {path} failed: HTTP {response.status_code}: {body}")
if response.content:
return response.json()
return None

View File

@@ -0,0 +1,57 @@
from functools import lru_cache
from pathlib import Path
from typing import Annotated
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
bambuddy_base_url: str
bambuddy_api_key: str
inventree_base_url: str
inventree_token: str
inventree_part_category_id: int
inventree_stock_location_id: int
service_api_token: str | None = None
webhook_shared_secret: str | None = None
sync_success_only: bool = True
default_stock_quantity: Annotated[float, Field(gt=0)] = 1
inventree_stock_status: int = 10
part_ipn_prefix: str = "BMB"
part_key_fields: str = "filename,name"
backfill_page_size: Annotated[int, Field(ge=1, le=250)] = 50
poll_interval_seconds: Annotated[int, Field(ge=0)] = 0
sync_on_startup: bool = False
http_timeout_seconds: Annotated[int, Field(ge=1)] = 30
data_dir: Path = Path("/data")
@field_validator("bambuddy_base_url", "inventree_base_url")
@classmethod
def strip_url(cls, value: str) -> str:
return value.rstrip("/")
@field_validator("part_ipn_prefix")
@classmethod
def clean_prefix(cls, value: str) -> str:
cleaned = "".join(char for char in value.upper() if char.isalnum() or char in ("-", "_"))
return cleaned or "BMB"
@property
def database_path(self) -> Path:
return self.data_dir / "sync.sqlite3"
@property
def part_key_field_names(self) -> list[str]:
fields = [field.strip() for field in self.part_key_fields.split(",") if field.strip()]
return fields or ["filename", "name"]
@lru_cache
def get_settings() -> Settings:
return Settings()

View File

@@ -0,0 +1,131 @@
import json
import sqlite3
from pathlib import Path
from typing import Any
class Database:
def __init__(self, path: Path) -> None:
self.path = path
def init(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.executescript(
"""
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS sync_records (
archive_id INTEGER PRIMARY KEY,
part_key TEXT,
part_id INTEGER,
stock_item_id INTEGER,
archive_status TEXT,
sync_status TEXT NOT NULL,
error TEXT,
raw_archive TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS part_map (
part_key TEXT PRIMARY KEY,
inventree_part_id INTEGER NOT NULL,
display_name TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
"""
)
def get_record(self, archive_id: int) -> dict[str, Any] | None:
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM sync_records WHERE archive_id = ?",
(archive_id,),
).fetchone()
return dict(row) if row else None
def upsert_record(
self,
*,
archive_id: int,
sync_status: str,
part_key: str | None = None,
part_id: int | None = None,
stock_item_id: int | None = None,
archive_status: str | None = None,
error: str | None = None,
raw_archive: dict[str, Any] | None = None,
) -> None:
raw_archive_json = json.dumps(raw_archive, ensure_ascii=False) if raw_archive is not None else None
with self._connect() as conn:
conn.execute(
"""
INSERT INTO sync_records (
archive_id, part_key, part_id, stock_item_id, archive_status,
sync_status, error, raw_archive
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(archive_id) DO UPDATE SET
part_key = excluded.part_key,
part_id = excluded.part_id,
stock_item_id = excluded.stock_item_id,
archive_status = excluded.archive_status,
sync_status = excluded.sync_status,
error = excluded.error,
raw_archive = COALESCE(excluded.raw_archive, sync_records.raw_archive),
updated_at = CURRENT_TIMESTAMP
""",
(
archive_id,
part_key,
part_id,
stock_item_id,
archive_status,
sync_status,
error,
raw_archive_json,
),
)
def get_part_id(self, part_key: str) -> int | None:
with self._connect() as conn:
row = conn.execute(
"SELECT inventree_part_id FROM part_map WHERE part_key = ?",
(part_key,),
).fetchone()
return int(row["inventree_part_id"]) if row else None
def upsert_part(self, *, part_key: str, inventree_part_id: int, display_name: str) -> None:
with self._connect() as conn:
conn.execute(
"""
INSERT INTO part_map (part_key, inventree_part_id, display_name)
VALUES (?, ?, ?)
ON CONFLICT(part_key) DO UPDATE SET
inventree_part_id = excluded.inventree_part_id,
display_name = excluded.display_name,
updated_at = CURRENT_TIMESTAMP
""",
(part_key, inventree_part_id, display_name),
)
def counts(self) -> dict[str, int]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT sync_status, COUNT(*) AS count
FROM sync_records
GROUP BY sync_status
"""
).fetchall()
parts = conn.execute("SELECT COUNT(*) AS count FROM part_map").fetchone()
result = {row["sync_status"]: int(row["count"]) for row in rows}
result["known_parts"] = int(parts["count"]) if parts else 0
return result
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.path)
conn.row_factory = sqlite3.Row
return conn

View File

@@ -0,0 +1,2 @@
class ExternalApiError(RuntimeError):
pass

View File

@@ -0,0 +1,103 @@
from typing import Any
import httpx
from .config import Settings
from .http_errors import ExternalApiError
from .models import Archive
class InvenTreeClient:
def __init__(self, settings: Settings) -> None:
base_url = settings.inventree_base_url.rstrip("/")
if not base_url.endswith("/api"):
base_url = f"{base_url}/api"
self.settings = settings
self.base_url = base_url
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers={
"Authorization": f"Token {settings.inventree_token}",
"Accept": "application/json",
"Content-Type": "application/json",
},
timeout=settings.http_timeout_seconds,
trust_env=False,
)
async def close(self) -> None:
await self.client.aclose()
async def get_part_category(self) -> dict[str, Any]:
return await self._request("GET", f"/part/category/{self.settings.inventree_part_category_id}/")
async def get_stock_location(self) -> dict[str, Any]:
return await self._request("GET", f"/stock/location/{self.settings.inventree_stock_location_id}/")
async def find_part_by_ipn(self, ipn: str) -> dict[str, Any] | None:
for params in ({"IPN": ipn, "limit": 100}, {"search": ipn, "limit": 100}):
data = await self._request("GET", "/part/", params=params)
for item in self._items(data):
if item.get("IPN") == ipn:
return item
return None
async def create_part(self, *, name: str, description: str, ipn: str) -> dict[str, Any]:
payload = {
"name": name[:100],
"description": description,
"category": self.settings.inventree_part_category_id,
"IPN": ipn[:100],
"active": True,
"component": True,
"purchaseable": False,
"salable": False,
"assembly": False,
"trackable": False,
"virtual": False,
}
return await self._request("POST", "/part/", json=payload)
async def find_stock_by_batch(self, *, part_id: int, batch: str) -> dict[str, Any] | None:
data = await self._request("GET", "/stock/", params={"part": part_id, "batch": batch, "limit": 100})
for item in self._items(data):
if item.get("part") == part_id and item.get("batch") == batch:
return item
return None
async def create_stock_item(self, *, part_id: int, archive: Archive, notes: str) -> dict[str, Any]:
quantity = archive.quantity or self.settings.default_stock_quantity
payload: dict[str, Any] = {
"part": part_id,
"location": self.settings.inventree_stock_location_id,
"quantity": quantity,
"status": self.settings.inventree_stock_status,
"batch": self.batch_for_archive(archive.id),
"notes": notes,
}
return await self._request("POST", "/stock/", json=payload)
@staticmethod
def batch_for_archive(archive_id: int) -> str:
return f"bambuddy-{archive_id}"
async def _request(self, method: str, path: str, **kwargs: Any) -> Any:
response = await self.client.request(method, path, **kwargs)
if response.status_code >= 400:
body = response.text[:1000]
raise ExternalApiError(f"InvenTree {method} {path} failed: HTTP {response.status_code}: {body}")
if response.content:
return response.json()
return None
@staticmethod
def _items(data: Any) -> list[dict[str, Any]]:
if isinstance(data, list):
return data
if isinstance(data, dict):
results = data.get("results")
if isinstance(results, list):
return results
return []

View File

@@ -0,0 +1,162 @@
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Any
from fastapi import BackgroundTasks, Depends, FastAPI, Header, HTTPException, Query, Request
from .bambuddy import BambuddyClient
from .config import Settings, get_settings
from .database import Database
from .http_errors import ExternalApiError
from .inventree import InvenTreeClient
from .models import BambuddyWebhook, SyncResult
from .sync import ArchiveSyncService
logger = logging.getLogger(__name__)
async def require_service_token(
request: Request,
x_service_token: str | None = Header(default=None),
) -> None:
settings: Settings = request.app.state.settings
if settings.service_api_token and x_service_token != settings.service_api_token:
raise HTTPException(status_code=401, detail="Missing or invalid X-Service-Token")
async def require_webhook_secret(
request: Request,
x_sync_secret: str | None = Header(default=None),
) -> None:
settings: Settings = request.app.state.settings
if settings.webhook_shared_secret and x_sync_secret != settings.webhook_shared_secret:
raise HTTPException(status_code=401, detail="Missing or invalid X-Sync-Secret")
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
database = Database(settings.database_path)
database.init()
bambuddy = BambuddyClient(settings)
inventree = InvenTreeClient(settings)
sync_service = ArchiveSyncService(
settings=settings,
database=database,
bambuddy=bambuddy,
inventree=inventree,
)
app.state.settings = settings
app.state.database = database
app.state.bambuddy = bambuddy
app.state.inventree = inventree
app.state.sync_service = sync_service
stop_event = asyncio.Event()
poll_task: asyncio.Task[Any] | None = None
async def poll_loop() -> None:
while not stop_event.is_set():
try:
await sync_service.backfill()
except Exception:
logger.exception("Scheduled backfill failed")
try:
await asyncio.wait_for(stop_event.wait(), timeout=settings.poll_interval_seconds)
except TimeoutError:
continue
if settings.sync_on_startup:
asyncio.create_task(sync_service.backfill())
if settings.poll_interval_seconds > 0:
poll_task = asyncio.create_task(poll_loop())
try:
yield
finally:
stop_event.set()
if poll_task:
poll_task.cancel()
await bambuddy.close()
await inventree.close()
app = FastAPI(
title="Bambuddy InvenTree Sync",
version="0.1.0",
lifespan=lifespan,
)
@app.get("/health")
async def health(request: Request) -> dict[str, Any]:
database: Database = request.app.state.database
return {"status": "ok", "counts": database.counts()}
@app.get("/validate", dependencies=[Depends(require_service_token)])
async def validate(request: Request) -> dict[str, Any]:
sync_service: ArchiveSyncService = request.app.state.sync_service
try:
return await sync_service.validate_targets()
except ExternalApiError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.post("/webhooks/bambuddy", response_model=SyncResult, dependencies=[Depends(require_webhook_secret)])
async def bambuddy_webhook(request: Request, payload: BambuddyWebhook) -> SyncResult:
if payload.event not in {"print_complete", "print_failed"}:
return SyncResult(archive_id=0, status="skipped", message=f"Ignored event {payload.event}")
archive_id = payload.data.get("archive_id") or payload.data.get("id")
if archive_id is None:
raise HTTPException(status_code=400, detail="Webhook payload does not contain data.archive_id")
sync_service: ArchiveSyncService = request.app.state.sync_service
try:
return await sync_service.sync_archive_id(int(archive_id), webhook_data=payload.data)
except ExternalApiError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.post("/sync/archive/{archive_id}", response_model=SyncResult, dependencies=[Depends(require_service_token)])
async def sync_archive(
request: Request,
archive_id: int,
force: bool = Query(default=False),
) -> SyncResult:
sync_service: ArchiveSyncService = request.app.state.sync_service
try:
return await sync_service.sync_archive_id(archive_id, force=force)
except ExternalApiError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.post("/sync/backfill", dependencies=[Depends(require_service_token)])
async def sync_backfill(
request: Request,
background_tasks: BackgroundTasks,
background: bool = Query(default=False),
status: str | None = Query(default=None),
max_archives: int | None = Query(default=None, ge=1),
) -> dict[str, Any]:
sync_service: ArchiveSyncService = request.app.state.sync_service
if background:
background_tasks.add_task(sync_service.backfill, status=status, max_archives=max_archives)
return {"status": "accepted"}
try:
return await sync_service.backfill(status=status, max_archives=max_archives)
except ExternalApiError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.get("/sync/status", dependencies=[Depends(require_service_token)])
async def sync_status(request: Request) -> dict[str, int]:
database: Database = request.app.state.database
return database.counts()

View File

@@ -0,0 +1,48 @@
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
class Archive(BaseModel):
model_config = ConfigDict(extra="allow")
id: int
name: str | None = None
filename: str | None = None
print_name: str | None = None
content_hash: str | None = None
printer_id: int | None = None
printer_name: str | None = None
created_at: str | None = None
started_at: str | None = None
completed_at: str | None = None
duration: int | float | None = None
print_time_seconds: int | float | None = None
actual_time_seconds: int | float | None = None
status: str | None = None
filament_used: float | None = None
filament_used_grams: float | None = None
filament_type: str | None = None
filament_color: str | None = None
quantity: float | None = None
object_count: int | None = None
cost: float | None = None
notes: str | None = None
tags: list[str] | None = None
class BambuddyWebhook(BaseModel):
model_config = ConfigDict(extra="allow")
event: str
timestamp: str | None = None
data: dict[str, Any] = Field(default_factory=dict)
class SyncResult(BaseModel):
archive_id: int
status: str
message: str
part_id: int | None = None
stock_item_id: int | None = None
part_key: str | None = None

View File

@@ -0,0 +1,276 @@
import asyncio
import hashlib
import logging
from pathlib import PurePosixPath, PureWindowsPath
from typing import Any
from .bambuddy import BambuddyClient
from .config import Settings
from .database import Database
from .http_errors import ExternalApiError
from .inventree import InvenTreeClient
from .models import Archive, SyncResult
logger = logging.getLogger(__name__)
class ArchiveSyncService:
def __init__(
self,
*,
settings: Settings,
database: Database,
bambuddy: BambuddyClient,
inventree: InvenTreeClient,
) -> None:
self.settings = settings
self.database = database
self.bambuddy = bambuddy
self.inventree = inventree
self._lock = asyncio.Lock()
async def validate_targets(self) -> dict[str, Any]:
category, location, bambuddy_info = await asyncio.gather(
self.inventree.get_part_category(),
self.inventree.get_stock_location(),
self.bambuddy.get_system_info(),
)
return {"part_category": category, "stock_location": location, "bambuddy": bambuddy_info}
async def sync_archive_id(
self,
archive_id: int,
*,
force: bool = False,
webhook_data: dict[str, Any] | None = None,
) -> SyncResult:
async with self._lock:
existing = self.database.get_record(archive_id)
if existing and existing["sync_status"] == "synced" and not force:
return SyncResult(
archive_id=archive_id,
status="already_synced",
message="Archive was already synced",
part_id=existing["part_id"],
stock_item_id=existing["stock_item_id"],
part_key=existing["part_key"],
)
try:
archive = await self._load_archive(archive_id, webhook_data)
return await self._sync_archive(archive)
except Exception as exc:
self.database.upsert_record(
archive_id=archive_id,
sync_status="failed",
error=str(exc),
raw_archive=webhook_data,
)
raise
async def backfill(self, *, status: str | None = None, max_archives: int | None = None) -> dict[str, int]:
status_filter = status
if status_filter is None and self.settings.sync_success_only:
status_filter = "success"
counts = {"seen": 0, "synced": 0, "already_synced": 0, "skipped": 0, "failed": 0}
offset = 0
limit = self.settings.backfill_page_size
while True:
total, archives = await self.bambuddy.list_archives(status=status_filter, limit=limit, offset=offset)
if not archives:
break
for archive in archives:
if max_archives is not None and counts["seen"] >= max_archives:
return counts
counts["seen"] += 1
try:
result = await self.sync_archive_id(archive.id, webhook_data=archive.model_dump())
counts[result.status] = counts.get(result.status, 0) + 1
except Exception:
logger.exception("Failed to sync Bambuddy archive %s during backfill", archive.id)
counts["failed"] += 1
offset += len(archives)
if total is not None and offset >= total:
break
return counts
async def _load_archive(self, archive_id: int, webhook_data: dict[str, Any] | None) -> Archive:
try:
return await self.bambuddy.get_archive(archive_id)
except ExternalApiError:
if webhook_data:
data = dict(webhook_data)
data.setdefault("id", archive_id)
return Archive.model_validate(data)
raise
async def _sync_archive(self, archive: Archive) -> SyncResult:
if self.settings.sync_success_only and not self.is_successful_archive(archive):
self.database.upsert_record(
archive_id=archive.id,
sync_status="skipped",
archive_status=archive.status,
raw_archive=archive.model_dump(),
)
return SyncResult(archive_id=archive.id, status="skipped", message=f"Archive status is {archive.status}")
part_key = self.part_key_for_archive(archive)
ipn = self.ipn_for_part_key(part_key)
name = self.name_for_archive(archive)
description = self.description_for_archive(archive)
part_id = await self._get_or_create_part(part_key=part_key, ipn=ipn, name=name, description=description)
batch = self.inventree.batch_for_archive(archive.id)
existing_stock = await self.inventree.find_stock_by_batch(part_id=part_id, batch=batch)
if existing_stock:
stock_item_id = int(existing_stock.get("pk") or existing_stock.get("id"))
self.database.upsert_record(
archive_id=archive.id,
sync_status="synced",
part_key=part_key,
part_id=part_id,
stock_item_id=stock_item_id,
archive_status=archive.status,
raw_archive=archive.model_dump(),
)
return SyncResult(
archive_id=archive.id,
status="already_synced",
message="Stock item already exists in InvenTree",
part_id=part_id,
stock_item_id=stock_item_id,
part_key=part_key,
)
stock = await self.inventree.create_stock_item(
part_id=part_id,
archive=archive,
notes=self.stock_notes_for_archive(archive),
)
stock_item_id = int(stock.get("pk") or stock.get("id"))
self.database.upsert_record(
archive_id=archive.id,
sync_status="synced",
part_key=part_key,
part_id=part_id,
stock_item_id=stock_item_id,
archive_status=archive.status,
raw_archive=archive.model_dump(),
)
logger.info(
"Synced Bambuddy archive %s to InvenTree part %s stock item %s",
archive.id,
part_id,
stock_item_id,
)
return SyncResult(
archive_id=archive.id,
status="synced",
message="Created stock item in InvenTree",
part_id=part_id,
stock_item_id=stock_item_id,
part_key=part_key,
)
async def _get_or_create_part(self, *, part_key: str, ipn: str, name: str, description: str) -> int:
cached_part_id = self.database.get_part_id(part_key)
if cached_part_id:
return cached_part_id
existing = await self.inventree.find_part_by_ipn(ipn)
if existing:
part_id = int(existing.get("pk") or existing.get("id"))
self.database.upsert_part(part_key=part_key, inventree_part_id=part_id, display_name=name)
return part_id
created = await self.inventree.create_part(name=name, description=description, ipn=ipn)
part_id = int(created.get("pk") or created.get("id"))
self.database.upsert_part(part_key=part_key, inventree_part_id=part_id, display_name=name)
return part_id
def part_key_for_archive(self, archive: Archive) -> str:
parts: list[str] = []
raw = archive.model_dump()
for field in self.settings.part_key_field_names:
value = raw.get(field)
if value is not None and str(value).strip():
parts.append(str(value).strip().lower())
if not parts:
parts.append(str(archive.id))
source = "|".join(parts)
return hashlib.sha1(source.encode("utf-8")).hexdigest()
def ipn_for_part_key(self, part_key: str) -> str:
return f"{self.settings.part_ipn_prefix}-{part_key[:12]}"
@staticmethod
def name_for_archive(archive: Archive) -> str:
if archive.name and archive.name.strip():
return archive.name.strip()
if archive.print_name and archive.print_name.strip():
return archive.print_name.strip()
if archive.filename and archive.filename.strip():
filename = archive.filename.strip()
if "\\" in filename:
return PureWindowsPath(filename).stem or filename
return PurePosixPath(filename).stem or filename
return f"Bambuddy archive {archive.id}"
def description_for_archive(self, archive: Archive) -> str:
return "\n".join(
line
for line in [
"Auto-created from Bambuddy archive.",
f"Archive ID: {archive.id}",
f"Print name: {archive.print_name}" if archive.print_name else "",
f"Filename: {archive.filename}" if archive.filename else "",
f"Content hash: {archive.content_hash}" if archive.content_hash else "",
f"Printer: {archive.printer_name}" if archive.printer_name else "",
f"Created at: {archive.created_at}" if archive.created_at else "",
f"Completed at: {archive.completed_at}" if archive.completed_at else "",
]
if line
)
def stock_notes_for_archive(self, archive: Archive) -> str:
duration = archive.actual_time_seconds or archive.print_time_seconds or archive.duration
filament_used = archive.filament_used_grams or archive.filament_used
rows = [
"Imported from Bambuddy.",
f"Archive ID: {archive.id}",
f"Print name: {archive.print_name}" if archive.print_name else "",
f"Filename: {archive.filename}" if archive.filename else "",
f"Printer: {archive.printer_name}" if archive.printer_name else "",
f"Started at: {archive.started_at}" if archive.started_at else "",
f"Completed at: {archive.completed_at}" if archive.completed_at else "",
f"Created at: {archive.created_at}" if archive.created_at else "",
f"Duration: {duration} seconds" if duration is not None else "",
f"Quantity: {archive.quantity}" if archive.quantity is not None else "",
f"Object count: {archive.object_count}" if archive.object_count is not None else "",
f"Filament used: {filament_used} g" if filament_used is not None else "",
f"Filament type: {archive.filament_type}" if archive.filament_type else "",
f"Filament color: {archive.filament_color}" if archive.filament_color else "",
f"Cost: {archive.cost}" if archive.cost is not None else "",
f"Bambuddy status: {archive.status}" if archive.status else "",
]
return "\n".join(row for row in rows if row)
@staticmethod
def is_successful_archive(archive: Archive) -> bool:
status = (archive.status or "").lower()
if status in {"success", "completed", "complete", "done"}:
return True
return bool(archive.completed_at and status not in {"failed", "stopped", "printing", "running"})