From 9f3d825120d688c61a41666a288db56491467e98 Mon Sep 17 00:00:00 2001 From: tcomlab Date: Wed, 15 Apr 2026 13:45:55 +0300 Subject: [PATCH] Add Bambuddy InvenTree sync service --- .dockerignore | 10 + .env.example | 33 +++ .gitignore | 17 ++ Dockerfile | 16 ++ README.md | 110 ++++++++ docker-compose.yml | 11 + requirements.txt | 4 + src/bambuddy_inventree_sync/__init__.py | 3 + src/bambuddy_inventree_sync/bambuddy.py | 61 +++++ src/bambuddy_inventree_sync/config.py | 57 +++++ src/bambuddy_inventree_sync/database.py | 131 ++++++++++ src/bambuddy_inventree_sync/http_errors.py | 2 + src/bambuddy_inventree_sync/inventree.py | 103 ++++++++ src/bambuddy_inventree_sync/main.py | 162 ++++++++++++ src/bambuddy_inventree_sync/models.py | 48 ++++ src/bambuddy_inventree_sync/sync.py | 276 +++++++++++++++++++++ 16 files changed, 1044 insertions(+) create mode 100644 .dockerignore create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 docker-compose.yml create mode 100644 requirements.txt create mode 100644 src/bambuddy_inventree_sync/__init__.py create mode 100644 src/bambuddy_inventree_sync/bambuddy.py create mode 100644 src/bambuddy_inventree_sync/config.py create mode 100644 src/bambuddy_inventree_sync/database.py create mode 100644 src/bambuddy_inventree_sync/http_errors.py create mode 100644 src/bambuddy_inventree_sync/inventree.py create mode 100644 src/bambuddy_inventree_sync/main.py create mode 100644 src/bambuddy_inventree_sync/models.py create mode 100644 src/bambuddy_inventree_sync/sync.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..bca2483 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,10 @@ +.env +.git +.pytest_cache +.pycache_check +__pycache__ +*.pyc +data +dist +build +*.egg-info diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..a26d6a6 --- /dev/null +++ b/.env.example @@ -0,0 +1,33 @@ +# Copy this file to .env and adjust values for your Windows Server. + +# Bambuddy API base URL. You can use either root URL or /api/v1 URL. +BAMBUDDY_BASE_URL=http://host.docker.internal:8000/api/v1 +BAMBUDDY_API_KEY=replace-with-bambuddy-api-key + +# InvenTree URL. Use root URL or /api URL; the service normalizes it. +INVENTREE_BASE_URL=http://host.docker.internal:1337 +INVENTREE_TOKEN=replace-with-inventree-token + +# Existing InvenTree IDs where printed parts should be created/stored. +INVENTREE_PART_CATEGORY_ID=1 +INVENTREE_STOCK_LOCATION_ID=1 + +# Optional tokens for this sync service. +# If SERVICE_API_TOKEN is set, manual sync endpoints require X-Service-Token. +SERVICE_API_TOKEN=change-me +# If WEBHOOK_SHARED_SECRET is set, Bambuddy webhook requests must send X-Sync-Secret. +WEBHOOK_SHARED_SECRET= + +# Sync behavior. +SYNC_SUCCESS_ONLY=true +DEFAULT_STOCK_QUANTITY=1 +INVENTREE_STOCK_STATUS=10 +PART_IPN_PREFIX=BMB +PART_KEY_FIELDS=filename,name +BACKFILL_PAGE_SIZE=50 +POLL_INTERVAL_SECONDS=0 +SYNC_ON_STARTUP=false +HTTP_TIMEOUT_SECONDS=30 + +# Persistent SQLite database inside the container. +DATA_DIR=/data diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ae7eaee --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +.env +.env.* +!.env.example + +data/ +*.sqlite +*.sqlite3 +*.db + +__pycache__/ +*.py[cod] +.pycache_check/ +.pytest_cache/ + +dist/ +build/ +*.egg-info/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..efaf6e0 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY src ./src + +EXPOSE 8080 + +CMD ["uvicorn", "bambuddy_inventree_sync.main:app", "--host", "0.0.0.0", "--port", "8080", "--app-dir", "src"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..6387185 --- /dev/null +++ b/README.md @@ -0,0 +1,110 @@ +# Bambuddy InvenTree Sync + +Small sidecar service for syncing Bambuddy `Archives` into InvenTree. + +It currently: + +- creates an InvenTree `Part` automatically when a printed model is first seen; +- creates one InvenTree `StockItem` for each synced Bambuddy archive; +- stores local sync state in SQLite to avoid duplicate stock items; +- accepts Bambuddy webhooks and can also backfill existing archives. + +The first version intentionally treats one Bambuddy archive as one printed stock item. Later we can add plate parsing, multi-object quantity detection, filament costing, thumbnails, 3MF attachments, and mapping rules. + +## Setup + +1. Copy `.env.example` to `.env`. +2. Fill in: + - `BAMBUDDY_BASE_URL` + - `BAMBUDDY_API_KEY` + - `INVENTREE_BASE_URL` + - `INVENTREE_TOKEN` + - `INVENTREE_PART_CATEGORY_ID` + - `INVENTREE_STOCK_LOCATION_ID` +3. Start the service: + +```powershell +docker compose up -d --build +``` + +The service listens on `http://localhost:8088`. + +## InvenTree IDs + +For the first version, use numeric IDs for the target InvenTree part category and stock location. Open the desired category/location in InvenTree and copy the ID from the URL or API response. + +Example: + +```env +INVENTREE_PART_CATEGORY_ID=12 +INVENTREE_STOCK_LOCATION_ID=7 +``` + +## Validate Connectivity + +If `SERVICE_API_TOKEN` is set in `.env`, pass it as `X-Service-Token`: + +```powershell +curl.exe -H "X-Service-Token: change-me" http://localhost:8088/validate +``` + +## Backfill Existing Archives + +Run a test with one archive first: + +```powershell +curl.exe -X POST -H "X-Service-Token: change-me" "http://localhost:8088/sync/backfill?max_archives=1" +``` + +Run full backfill for successful Bambuddy archives: + +```powershell +curl.exe -X POST -H "X-Service-Token: change-me" http://localhost:8088/sync/backfill +``` + +The default behavior is `SYNC_SUCCESS_ONLY=true`, so failed or stopped prints are not imported. + +## Bambuddy Webhook + +In Bambuddy, configure a webhook to: + +```text +http://WINDOWS-SERVER-IP:8088/webhooks/bambuddy +``` + +If `WEBHOOK_SHARED_SECRET` is configured, Bambuddy must send this header: + +```text +X-Sync-Secret: your-secret +``` + +The service expects Bambuddy payloads with `event=print_complete` and `data.archive_id`. + +## Part Matching + +The service builds a stable key from: + +```env +PART_KEY_FIELDS=filename,name +``` + +That key becomes an InvenTree IPN: + +```text +BMB- +``` + +This means repeat prints of the same file/name reuse the same `Part` and create new `StockItem` rows. To change matching behavior later, edit `PART_KEY_FIELDS`. + +## Useful Endpoints + +```text +GET /health +GET /sync/status +GET /validate +POST /sync/archive/{archive_id} +POST /sync/backfill +POST /webhooks/bambuddy +``` + +Manual sync endpoints require `X-Service-Token` when `SERVICE_API_TOKEN` is set. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..8aabec0 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,11 @@ +services: + bambuddy-inventree-sync: + build: . + container_name: bambuddy-inventree-sync + restart: unless-stopped + env_file: + - .env + ports: + - "8088:8080" + volumes: + - ./data:/data diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a4cec2b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +fastapi>=0.115,<1 +httpx>=0.28,<1 +pydantic-settings>=2.8,<3 +uvicorn[standard]>=0.34,<1 diff --git a/src/bambuddy_inventree_sync/__init__.py b/src/bambuddy_inventree_sync/__init__.py new file mode 100644 index 0000000..a05eb9a --- /dev/null +++ b/src/bambuddy_inventree_sync/__init__.py @@ -0,0 +1,3 @@ +__all__ = ["__version__"] + +__version__ = "0.1.0" diff --git a/src/bambuddy_inventree_sync/bambuddy.py b/src/bambuddy_inventree_sync/bambuddy.py new file mode 100644 index 0000000..b8ed2dc --- /dev/null +++ b/src/bambuddy_inventree_sync/bambuddy.py @@ -0,0 +1,61 @@ +from typing import Any + +import httpx + +from .config import Settings +from .http_errors import ExternalApiError +from .models import Archive + + +class BambuddyClient: + def __init__(self, settings: Settings) -> None: + base_url = settings.bambuddy_base_url.rstrip("/") + if not base_url.endswith("/api/v1"): + base_url = f"{base_url}/api/v1" + + self.base_url = base_url + self.client = httpx.AsyncClient( + base_url=self.base_url, + headers={"X-API-Key": settings.bambuddy_api_key, "Accept": "application/json"}, + timeout=settings.http_timeout_seconds, + trust_env=False, + ) + + async def close(self) -> None: + await self.client.aclose() + + async def get_archive(self, archive_id: int) -> Archive: + data = await self._request("GET", f"/archives/{archive_id}") + return Archive.model_validate(data) + + async def get_system_info(self) -> dict[str, Any]: + return await self._request("GET", "/system/info") + + async def list_archives( + self, + *, + status: str | None, + limit: int, + offset: int, + ) -> tuple[int | None, list[Archive]]: + params: dict[str, Any] = {"limit": limit, "offset": offset} + if status: + params["status"] = status + + data = await self._request("GET", "/archives/", params=params) + if isinstance(data, list): + return None, [Archive.model_validate(item) for item in data] + + archives = data.get("archives") or data.get("results") or [] + total = data.get("total") or data.get("count") + return total, [Archive.model_validate(item) for item in archives] + + async def _request(self, method: str, path: str, **kwargs: Any) -> Any: + response = await self.client.request(method, path, **kwargs) + if response.status_code >= 400: + body = response.text[:1000] + raise ExternalApiError(f"Bambuddy {method} {path} failed: HTTP {response.status_code}: {body}") + + if response.content: + return response.json() + return None diff --git a/src/bambuddy_inventree_sync/config.py b/src/bambuddy_inventree_sync/config.py new file mode 100644 index 0000000..91912d8 --- /dev/null +++ b/src/bambuddy_inventree_sync/config.py @@ -0,0 +1,57 @@ +from functools import lru_cache +from pathlib import Path +from typing import Annotated + +from pydantic import Field, field_validator +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") + + bambuddy_base_url: str + bambuddy_api_key: str + + inventree_base_url: str + inventree_token: str + inventree_part_category_id: int + inventree_stock_location_id: int + + service_api_token: str | None = None + webhook_shared_secret: str | None = None + + sync_success_only: bool = True + default_stock_quantity: Annotated[float, Field(gt=0)] = 1 + inventree_stock_status: int = 10 + part_ipn_prefix: str = "BMB" + part_key_fields: str = "filename,name" + backfill_page_size: Annotated[int, Field(ge=1, le=250)] = 50 + poll_interval_seconds: Annotated[int, Field(ge=0)] = 0 + sync_on_startup: bool = False + http_timeout_seconds: Annotated[int, Field(ge=1)] = 30 + data_dir: Path = Path("/data") + + @field_validator("bambuddy_base_url", "inventree_base_url") + @classmethod + def strip_url(cls, value: str) -> str: + return value.rstrip("/") + + @field_validator("part_ipn_prefix") + @classmethod + def clean_prefix(cls, value: str) -> str: + cleaned = "".join(char for char in value.upper() if char.isalnum() or char in ("-", "_")) + return cleaned or "BMB" + + @property + def database_path(self) -> Path: + return self.data_dir / "sync.sqlite3" + + @property + def part_key_field_names(self) -> list[str]: + fields = [field.strip() for field in self.part_key_fields.split(",") if field.strip()] + return fields or ["filename", "name"] + + +@lru_cache +def get_settings() -> Settings: + return Settings() diff --git a/src/bambuddy_inventree_sync/database.py b/src/bambuddy_inventree_sync/database.py new file mode 100644 index 0000000..6248a0d --- /dev/null +++ b/src/bambuddy_inventree_sync/database.py @@ -0,0 +1,131 @@ +import json +import sqlite3 +from pathlib import Path +from typing import Any + + +class Database: + def __init__(self, path: Path) -> None: + self.path = path + + def init(self) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + with self._connect() as conn: + conn.executescript( + """ + PRAGMA journal_mode=WAL; + + CREATE TABLE IF NOT EXISTS sync_records ( + archive_id INTEGER PRIMARY KEY, + part_key TEXT, + part_id INTEGER, + stock_item_id INTEGER, + archive_status TEXT, + sync_status TEXT NOT NULL, + error TEXT, + raw_archive TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS part_map ( + part_key TEXT PRIMARY KEY, + inventree_part_id INTEGER NOT NULL, + display_name TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + """ + ) + + def get_record(self, archive_id: int) -> dict[str, Any] | None: + with self._connect() as conn: + row = conn.execute( + "SELECT * FROM sync_records WHERE archive_id = ?", + (archive_id,), + ).fetchone() + return dict(row) if row else None + + def upsert_record( + self, + *, + archive_id: int, + sync_status: str, + part_key: str | None = None, + part_id: int | None = None, + stock_item_id: int | None = None, + archive_status: str | None = None, + error: str | None = None, + raw_archive: dict[str, Any] | None = None, + ) -> None: + raw_archive_json = json.dumps(raw_archive, ensure_ascii=False) if raw_archive is not None else None + with self._connect() as conn: + conn.execute( + """ + INSERT INTO sync_records ( + archive_id, part_key, part_id, stock_item_id, archive_status, + sync_status, error, raw_archive + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(archive_id) DO UPDATE SET + part_key = excluded.part_key, + part_id = excluded.part_id, + stock_item_id = excluded.stock_item_id, + archive_status = excluded.archive_status, + sync_status = excluded.sync_status, + error = excluded.error, + raw_archive = COALESCE(excluded.raw_archive, sync_records.raw_archive), + updated_at = CURRENT_TIMESTAMP + """, + ( + archive_id, + part_key, + part_id, + stock_item_id, + archive_status, + sync_status, + error, + raw_archive_json, + ), + ) + + def get_part_id(self, part_key: str) -> int | None: + with self._connect() as conn: + row = conn.execute( + "SELECT inventree_part_id FROM part_map WHERE part_key = ?", + (part_key,), + ).fetchone() + return int(row["inventree_part_id"]) if row else None + + def upsert_part(self, *, part_key: str, inventree_part_id: int, display_name: str) -> None: + with self._connect() as conn: + conn.execute( + """ + INSERT INTO part_map (part_key, inventree_part_id, display_name) + VALUES (?, ?, ?) + ON CONFLICT(part_key) DO UPDATE SET + inventree_part_id = excluded.inventree_part_id, + display_name = excluded.display_name, + updated_at = CURRENT_TIMESTAMP + """, + (part_key, inventree_part_id, display_name), + ) + + def counts(self) -> dict[str, int]: + with self._connect() as conn: + rows = conn.execute( + """ + SELECT sync_status, COUNT(*) AS count + FROM sync_records + GROUP BY sync_status + """ + ).fetchall() + parts = conn.execute("SELECT COUNT(*) AS count FROM part_map").fetchone() + result = {row["sync_status"]: int(row["count"]) for row in rows} + result["known_parts"] = int(parts["count"]) if parts else 0 + return result + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.path) + conn.row_factory = sqlite3.Row + return conn diff --git a/src/bambuddy_inventree_sync/http_errors.py b/src/bambuddy_inventree_sync/http_errors.py new file mode 100644 index 0000000..87ad926 --- /dev/null +++ b/src/bambuddy_inventree_sync/http_errors.py @@ -0,0 +1,2 @@ +class ExternalApiError(RuntimeError): + pass diff --git a/src/bambuddy_inventree_sync/inventree.py b/src/bambuddy_inventree_sync/inventree.py new file mode 100644 index 0000000..8a8f814 --- /dev/null +++ b/src/bambuddy_inventree_sync/inventree.py @@ -0,0 +1,103 @@ +from typing import Any + +import httpx + +from .config import Settings +from .http_errors import ExternalApiError +from .models import Archive + + +class InvenTreeClient: + def __init__(self, settings: Settings) -> None: + base_url = settings.inventree_base_url.rstrip("/") + if not base_url.endswith("/api"): + base_url = f"{base_url}/api" + + self.settings = settings + self.base_url = base_url + self.client = httpx.AsyncClient( + base_url=self.base_url, + headers={ + "Authorization": f"Token {settings.inventree_token}", + "Accept": "application/json", + "Content-Type": "application/json", + }, + timeout=settings.http_timeout_seconds, + trust_env=False, + ) + + async def close(self) -> None: + await self.client.aclose() + + async def get_part_category(self) -> dict[str, Any]: + return await self._request("GET", f"/part/category/{self.settings.inventree_part_category_id}/") + + async def get_stock_location(self) -> dict[str, Any]: + return await self._request("GET", f"/stock/location/{self.settings.inventree_stock_location_id}/") + + async def find_part_by_ipn(self, ipn: str) -> dict[str, Any] | None: + for params in ({"IPN": ipn, "limit": 100}, {"search": ipn, "limit": 100}): + data = await self._request("GET", "/part/", params=params) + for item in self._items(data): + if item.get("IPN") == ipn: + return item + return None + + async def create_part(self, *, name: str, description: str, ipn: str) -> dict[str, Any]: + payload = { + "name": name[:100], + "description": description, + "category": self.settings.inventree_part_category_id, + "IPN": ipn[:100], + "active": True, + "component": True, + "purchaseable": False, + "salable": False, + "assembly": False, + "trackable": False, + "virtual": False, + } + return await self._request("POST", "/part/", json=payload) + + async def find_stock_by_batch(self, *, part_id: int, batch: str) -> dict[str, Any] | None: + data = await self._request("GET", "/stock/", params={"part": part_id, "batch": batch, "limit": 100}) + for item in self._items(data): + if item.get("part") == part_id and item.get("batch") == batch: + return item + return None + + async def create_stock_item(self, *, part_id: int, archive: Archive, notes: str) -> dict[str, Any]: + quantity = archive.quantity or self.settings.default_stock_quantity + payload: dict[str, Any] = { + "part": part_id, + "location": self.settings.inventree_stock_location_id, + "quantity": quantity, + "status": self.settings.inventree_stock_status, + "batch": self.batch_for_archive(archive.id), + "notes": notes, + } + return await self._request("POST", "/stock/", json=payload) + + @staticmethod + def batch_for_archive(archive_id: int) -> str: + return f"bambuddy-{archive_id}" + + async def _request(self, method: str, path: str, **kwargs: Any) -> Any: + response = await self.client.request(method, path, **kwargs) + if response.status_code >= 400: + body = response.text[:1000] + raise ExternalApiError(f"InvenTree {method} {path} failed: HTTP {response.status_code}: {body}") + + if response.content: + return response.json() + return None + + @staticmethod + def _items(data: Any) -> list[dict[str, Any]]: + if isinstance(data, list): + return data + if isinstance(data, dict): + results = data.get("results") + if isinstance(results, list): + return results + return [] diff --git a/src/bambuddy_inventree_sync/main.py b/src/bambuddy_inventree_sync/main.py new file mode 100644 index 0000000..f785618 --- /dev/null +++ b/src/bambuddy_inventree_sync/main.py @@ -0,0 +1,162 @@ +import asyncio +import logging +from contextlib import asynccontextmanager +from typing import Any + +from fastapi import BackgroundTasks, Depends, FastAPI, Header, HTTPException, Query, Request + +from .bambuddy import BambuddyClient +from .config import Settings, get_settings +from .database import Database +from .http_errors import ExternalApiError +from .inventree import InvenTreeClient +from .models import BambuddyWebhook, SyncResult +from .sync import ArchiveSyncService + +logger = logging.getLogger(__name__) + + +async def require_service_token( + request: Request, + x_service_token: str | None = Header(default=None), +) -> None: + settings: Settings = request.app.state.settings + if settings.service_api_token and x_service_token != settings.service_api_token: + raise HTTPException(status_code=401, detail="Missing or invalid X-Service-Token") + + +async def require_webhook_secret( + request: Request, + x_sync_secret: str | None = Header(default=None), +) -> None: + settings: Settings = request.app.state.settings + if settings.webhook_shared_secret and x_sync_secret != settings.webhook_shared_secret: + raise HTTPException(status_code=401, detail="Missing or invalid X-Sync-Secret") + + +@asynccontextmanager +async def lifespan(app: FastAPI): + settings = get_settings() + database = Database(settings.database_path) + database.init() + + bambuddy = BambuddyClient(settings) + inventree = InvenTreeClient(settings) + sync_service = ArchiveSyncService( + settings=settings, + database=database, + bambuddy=bambuddy, + inventree=inventree, + ) + + app.state.settings = settings + app.state.database = database + app.state.bambuddy = bambuddy + app.state.inventree = inventree + app.state.sync_service = sync_service + + stop_event = asyncio.Event() + poll_task: asyncio.Task[Any] | None = None + + async def poll_loop() -> None: + while not stop_event.is_set(): + try: + await sync_service.backfill() + except Exception: + logger.exception("Scheduled backfill failed") + try: + await asyncio.wait_for(stop_event.wait(), timeout=settings.poll_interval_seconds) + except TimeoutError: + continue + + if settings.sync_on_startup: + asyncio.create_task(sync_service.backfill()) + + if settings.poll_interval_seconds > 0: + poll_task = asyncio.create_task(poll_loop()) + + try: + yield + finally: + stop_event.set() + if poll_task: + poll_task.cancel() + await bambuddy.close() + await inventree.close() + + +app = FastAPI( + title="Bambuddy InvenTree Sync", + version="0.1.0", + lifespan=lifespan, +) + + +@app.get("/health") +async def health(request: Request) -> dict[str, Any]: + database: Database = request.app.state.database + return {"status": "ok", "counts": database.counts()} + + +@app.get("/validate", dependencies=[Depends(require_service_token)]) +async def validate(request: Request) -> dict[str, Any]: + sync_service: ArchiveSyncService = request.app.state.sync_service + try: + return await sync_service.validate_targets() + except ExternalApiError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + + +@app.post("/webhooks/bambuddy", response_model=SyncResult, dependencies=[Depends(require_webhook_secret)]) +async def bambuddy_webhook(request: Request, payload: BambuddyWebhook) -> SyncResult: + if payload.event not in {"print_complete", "print_failed"}: + return SyncResult(archive_id=0, status="skipped", message=f"Ignored event {payload.event}") + + archive_id = payload.data.get("archive_id") or payload.data.get("id") + if archive_id is None: + raise HTTPException(status_code=400, detail="Webhook payload does not contain data.archive_id") + + sync_service: ArchiveSyncService = request.app.state.sync_service + try: + return await sync_service.sync_archive_id(int(archive_id), webhook_data=payload.data) + except ExternalApiError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + + +@app.post("/sync/archive/{archive_id}", response_model=SyncResult, dependencies=[Depends(require_service_token)]) +async def sync_archive( + request: Request, + archive_id: int, + force: bool = Query(default=False), +) -> SyncResult: + sync_service: ArchiveSyncService = request.app.state.sync_service + try: + return await sync_service.sync_archive_id(archive_id, force=force) + except ExternalApiError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + + +@app.post("/sync/backfill", dependencies=[Depends(require_service_token)]) +async def sync_backfill( + request: Request, + background_tasks: BackgroundTasks, + background: bool = Query(default=False), + status: str | None = Query(default=None), + max_archives: int | None = Query(default=None, ge=1), +) -> dict[str, Any]: + sync_service: ArchiveSyncService = request.app.state.sync_service + + if background: + background_tasks.add_task(sync_service.backfill, status=status, max_archives=max_archives) + return {"status": "accepted"} + + try: + return await sync_service.backfill(status=status, max_archives=max_archives) + except ExternalApiError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + + +@app.get("/sync/status", dependencies=[Depends(require_service_token)]) +async def sync_status(request: Request) -> dict[str, int]: + database: Database = request.app.state.database + return database.counts() diff --git a/src/bambuddy_inventree_sync/models.py b/src/bambuddy_inventree_sync/models.py new file mode 100644 index 0000000..b497aad --- /dev/null +++ b/src/bambuddy_inventree_sync/models.py @@ -0,0 +1,48 @@ +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + + +class Archive(BaseModel): + model_config = ConfigDict(extra="allow") + + id: int + name: str | None = None + filename: str | None = None + print_name: str | None = None + content_hash: str | None = None + printer_id: int | None = None + printer_name: str | None = None + created_at: str | None = None + started_at: str | None = None + completed_at: str | None = None + duration: int | float | None = None + print_time_seconds: int | float | None = None + actual_time_seconds: int | float | None = None + status: str | None = None + filament_used: float | None = None + filament_used_grams: float | None = None + filament_type: str | None = None + filament_color: str | None = None + quantity: float | None = None + object_count: int | None = None + cost: float | None = None + notes: str | None = None + tags: list[str] | None = None + + +class BambuddyWebhook(BaseModel): + model_config = ConfigDict(extra="allow") + + event: str + timestamp: str | None = None + data: dict[str, Any] = Field(default_factory=dict) + + +class SyncResult(BaseModel): + archive_id: int + status: str + message: str + part_id: int | None = None + stock_item_id: int | None = None + part_key: str | None = None diff --git a/src/bambuddy_inventree_sync/sync.py b/src/bambuddy_inventree_sync/sync.py new file mode 100644 index 0000000..06f6309 --- /dev/null +++ b/src/bambuddy_inventree_sync/sync.py @@ -0,0 +1,276 @@ +import asyncio +import hashlib +import logging +from pathlib import PurePosixPath, PureWindowsPath +from typing import Any + +from .bambuddy import BambuddyClient +from .config import Settings +from .database import Database +from .http_errors import ExternalApiError +from .inventree import InvenTreeClient +from .models import Archive, SyncResult + +logger = logging.getLogger(__name__) + + +class ArchiveSyncService: + def __init__( + self, + *, + settings: Settings, + database: Database, + bambuddy: BambuddyClient, + inventree: InvenTreeClient, + ) -> None: + self.settings = settings + self.database = database + self.bambuddy = bambuddy + self.inventree = inventree + self._lock = asyncio.Lock() + + async def validate_targets(self) -> dict[str, Any]: + category, location, bambuddy_info = await asyncio.gather( + self.inventree.get_part_category(), + self.inventree.get_stock_location(), + self.bambuddy.get_system_info(), + ) + return {"part_category": category, "stock_location": location, "bambuddy": bambuddy_info} + + async def sync_archive_id( + self, + archive_id: int, + *, + force: bool = False, + webhook_data: dict[str, Any] | None = None, + ) -> SyncResult: + async with self._lock: + existing = self.database.get_record(archive_id) + if existing and existing["sync_status"] == "synced" and not force: + return SyncResult( + archive_id=archive_id, + status="already_synced", + message="Archive was already synced", + part_id=existing["part_id"], + stock_item_id=existing["stock_item_id"], + part_key=existing["part_key"], + ) + + try: + archive = await self._load_archive(archive_id, webhook_data) + return await self._sync_archive(archive) + except Exception as exc: + self.database.upsert_record( + archive_id=archive_id, + sync_status="failed", + error=str(exc), + raw_archive=webhook_data, + ) + raise + + async def backfill(self, *, status: str | None = None, max_archives: int | None = None) -> dict[str, int]: + status_filter = status + if status_filter is None and self.settings.sync_success_only: + status_filter = "success" + + counts = {"seen": 0, "synced": 0, "already_synced": 0, "skipped": 0, "failed": 0} + offset = 0 + limit = self.settings.backfill_page_size + + while True: + total, archives = await self.bambuddy.list_archives(status=status_filter, limit=limit, offset=offset) + if not archives: + break + + for archive in archives: + if max_archives is not None and counts["seen"] >= max_archives: + return counts + + counts["seen"] += 1 + try: + result = await self.sync_archive_id(archive.id, webhook_data=archive.model_dump()) + counts[result.status] = counts.get(result.status, 0) + 1 + except Exception: + logger.exception("Failed to sync Bambuddy archive %s during backfill", archive.id) + counts["failed"] += 1 + + offset += len(archives) + if total is not None and offset >= total: + break + + return counts + + async def _load_archive(self, archive_id: int, webhook_data: dict[str, Any] | None) -> Archive: + try: + return await self.bambuddy.get_archive(archive_id) + except ExternalApiError: + if webhook_data: + data = dict(webhook_data) + data.setdefault("id", archive_id) + return Archive.model_validate(data) + raise + + async def _sync_archive(self, archive: Archive) -> SyncResult: + if self.settings.sync_success_only and not self.is_successful_archive(archive): + self.database.upsert_record( + archive_id=archive.id, + sync_status="skipped", + archive_status=archive.status, + raw_archive=archive.model_dump(), + ) + return SyncResult(archive_id=archive.id, status="skipped", message=f"Archive status is {archive.status}") + + part_key = self.part_key_for_archive(archive) + ipn = self.ipn_for_part_key(part_key) + name = self.name_for_archive(archive) + description = self.description_for_archive(archive) + + part_id = await self._get_or_create_part(part_key=part_key, ipn=ipn, name=name, description=description) + batch = self.inventree.batch_for_archive(archive.id) + existing_stock = await self.inventree.find_stock_by_batch(part_id=part_id, batch=batch) + + if existing_stock: + stock_item_id = int(existing_stock.get("pk") or existing_stock.get("id")) + self.database.upsert_record( + archive_id=archive.id, + sync_status="synced", + part_key=part_key, + part_id=part_id, + stock_item_id=stock_item_id, + archive_status=archive.status, + raw_archive=archive.model_dump(), + ) + return SyncResult( + archive_id=archive.id, + status="already_synced", + message="Stock item already exists in InvenTree", + part_id=part_id, + stock_item_id=stock_item_id, + part_key=part_key, + ) + + stock = await self.inventree.create_stock_item( + part_id=part_id, + archive=archive, + notes=self.stock_notes_for_archive(archive), + ) + stock_item_id = int(stock.get("pk") or stock.get("id")) + + self.database.upsert_record( + archive_id=archive.id, + sync_status="synced", + part_key=part_key, + part_id=part_id, + stock_item_id=stock_item_id, + archive_status=archive.status, + raw_archive=archive.model_dump(), + ) + logger.info( + "Synced Bambuddy archive %s to InvenTree part %s stock item %s", + archive.id, + part_id, + stock_item_id, + ) + return SyncResult( + archive_id=archive.id, + status="synced", + message="Created stock item in InvenTree", + part_id=part_id, + stock_item_id=stock_item_id, + part_key=part_key, + ) + + async def _get_or_create_part(self, *, part_key: str, ipn: str, name: str, description: str) -> int: + cached_part_id = self.database.get_part_id(part_key) + if cached_part_id: + return cached_part_id + + existing = await self.inventree.find_part_by_ipn(ipn) + if existing: + part_id = int(existing.get("pk") or existing.get("id")) + self.database.upsert_part(part_key=part_key, inventree_part_id=part_id, display_name=name) + return part_id + + created = await self.inventree.create_part(name=name, description=description, ipn=ipn) + part_id = int(created.get("pk") or created.get("id")) + self.database.upsert_part(part_key=part_key, inventree_part_id=part_id, display_name=name) + return part_id + + def part_key_for_archive(self, archive: Archive) -> str: + parts: list[str] = [] + raw = archive.model_dump() + for field in self.settings.part_key_field_names: + value = raw.get(field) + if value is not None and str(value).strip(): + parts.append(str(value).strip().lower()) + + if not parts: + parts.append(str(archive.id)) + + source = "|".join(parts) + return hashlib.sha1(source.encode("utf-8")).hexdigest() + + def ipn_for_part_key(self, part_key: str) -> str: + return f"{self.settings.part_ipn_prefix}-{part_key[:12]}" + + @staticmethod + def name_for_archive(archive: Archive) -> str: + if archive.name and archive.name.strip(): + return archive.name.strip() + + if archive.print_name and archive.print_name.strip(): + return archive.print_name.strip() + + if archive.filename and archive.filename.strip(): + filename = archive.filename.strip() + if "\\" in filename: + return PureWindowsPath(filename).stem or filename + return PurePosixPath(filename).stem or filename + + return f"Bambuddy archive {archive.id}" + + def description_for_archive(self, archive: Archive) -> str: + return "\n".join( + line + for line in [ + "Auto-created from Bambuddy archive.", + f"Archive ID: {archive.id}", + f"Print name: {archive.print_name}" if archive.print_name else "", + f"Filename: {archive.filename}" if archive.filename else "", + f"Content hash: {archive.content_hash}" if archive.content_hash else "", + f"Printer: {archive.printer_name}" if archive.printer_name else "", + f"Created at: {archive.created_at}" if archive.created_at else "", + f"Completed at: {archive.completed_at}" if archive.completed_at else "", + ] + if line + ) + + def stock_notes_for_archive(self, archive: Archive) -> str: + duration = archive.actual_time_seconds or archive.print_time_seconds or archive.duration + filament_used = archive.filament_used_grams or archive.filament_used + rows = [ + "Imported from Bambuddy.", + f"Archive ID: {archive.id}", + f"Print name: {archive.print_name}" if archive.print_name else "", + f"Filename: {archive.filename}" if archive.filename else "", + f"Printer: {archive.printer_name}" if archive.printer_name else "", + f"Started at: {archive.started_at}" if archive.started_at else "", + f"Completed at: {archive.completed_at}" if archive.completed_at else "", + f"Created at: {archive.created_at}" if archive.created_at else "", + f"Duration: {duration} seconds" if duration is not None else "", + f"Quantity: {archive.quantity}" if archive.quantity is not None else "", + f"Object count: {archive.object_count}" if archive.object_count is not None else "", + f"Filament used: {filament_used} g" if filament_used is not None else "", + f"Filament type: {archive.filament_type}" if archive.filament_type else "", + f"Filament color: {archive.filament_color}" if archive.filament_color else "", + f"Cost: {archive.cost}" if archive.cost is not None else "", + f"Bambuddy status: {archive.status}" if archive.status else "", + ] + return "\n".join(row for row in rows if row) + + @staticmethod + def is_successful_archive(archive: Archive) -> bool: + status = (archive.status or "").lower() + if status in {"success", "completed", "complete", "done"}: + return True + return bool(archive.completed_at and status not in {"failed", "stopped", "printing", "running"})