From 123100150b02c1ecf54144ce4302ddedff433625 Mon Sep 17 00:00:00 2001 From: tcomlab Date: Wed, 15 Apr 2026 14:16:52 +0300 Subject: [PATCH] Sync print weight and time parameters --- README.md | 7 +++ src/bambuddy_inventree_sync/inventree.py | 57 ++++++++++++++++++++++++ src/bambuddy_inventree_sync/sync.py | 53 ++++++++++++++++++++++ 3 files changed, 117 insertions(+) diff --git a/README.md b/README.md index 88e195f..ee99a2e 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,13 @@ BMB- This means repeat prints of the same file/name reuse the same `Part` and create new `StockItem` rows. To change matching behavior later, edit `PART_KEY_FIELDS`. +## InvenTree Part Parameters + +For each synced `Part`, the service updates these InvenTree parameters when matching parameter templates exist: + +- `Weight`: filament weight per printed item in grams. If a Bambuddy archive has `quantity=2`, the total filament weight is divided by 2. +- `PrintTime`: print duration from Bambuddy. The visible value is formatted as `1h 6m 1s`; `data_numeric` stores the duration in seconds. + ## Useful Endpoints ```text diff --git a/src/bambuddy_inventree_sync/inventree.py b/src/bambuddy_inventree_sync/inventree.py index 6cc169a..e9db2d1 100644 --- a/src/bambuddy_inventree_sync/inventree.py +++ b/src/bambuddy_inventree_sync/inventree.py @@ -15,6 +15,7 @@ class InvenTreeClient: self.settings = settings self.base_url = base_url + self._parameter_template_cache: dict[str, int] = {} self.client = httpx.AsyncClient( base_url=self.base_url, headers={ @@ -78,6 +79,62 @@ class InvenTreeClient: } return await self._request("POST", "/stock/", json=payload) + async def upsert_part_parameter( + self, + *, + part_id: int, + template_name: str, + data: str, + data_numeric: float | None, + note: str, + ) -> dict[str, Any]: + template_id = await self.get_parameter_template_id(template_name) + existing = await self.find_part_parameter(part_id=part_id, template_id=template_id) + payload: dict[str, Any] = { + "template": template_id, + "model_type": "part.part", + "model_id": part_id, + "data": data[:500], + "data_numeric": data_numeric, + "note": note[:500], + } + + if existing: + parameter_id = int(existing.get("pk") or existing.get("id")) + return await self._request("PATCH", f"/parameter/{parameter_id}/", json=payload) + + return await self._request("POST", "/parameter/", json=payload) + + async def get_parameter_template_id(self, name: str) -> int: + cache_key = name.lower() + if cache_key in self._parameter_template_cache: + return self._parameter_template_cache[cache_key] + + data = await self._request("GET", "/parameter/template/", params={"search": name, "limit": 100}) + for item in self._items(data): + if str(item.get("name", "")).lower() == cache_key: + template_id = int(item.get("pk") or item.get("id")) + self._parameter_template_cache[cache_key] = template_id + return template_id + + raise ExternalApiError(f"InvenTree parameter template '{name}' was not found") + + async def find_part_parameter(self, *, part_id: int, template_id: int) -> dict[str, Any] | None: + data = await self._request( + "GET", + "/parameter/", + params={ + "model_type": "part.part", + "model_id": part_id, + "template": template_id, + "limit": 100, + }, + ) + for item in self._items(data): + if item.get("model_type") == "part.part" and item.get("model_id") == part_id and item.get("template") == template_id: + return item + return None + @staticmethod def batch_for_archive(archive_id: int) -> str: return f"bambuddy-{archive_id}" diff --git a/src/bambuddy_inventree_sync/sync.py b/src/bambuddy_inventree_sync/sync.py index 7d1129c..86b8bc8 100644 --- a/src/bambuddy_inventree_sync/sync.py +++ b/src/bambuddy_inventree_sync/sync.py @@ -131,6 +131,7 @@ class ArchiveSyncService: description = self.description_for_archive(archive) part_id = await self._get_or_create_part(part_key=part_key, ipn=ipn, name=name, description=description) + await self._sync_part_parameters(part_id=part_id, archive=archive) batch = self.inventree.batch_for_archive(archive.id) existing_stock = await self.inventree.find_stock_by_batch(part_id=part_id, batch=batch) @@ -201,6 +202,42 @@ class ArchiveSyncService: self.database.upsert_part(part_key=part_key, inventree_part_id=part_id, display_name=name) return part_id + async def _sync_part_parameters(self, *, part_id: int, archive: Archive) -> None: + for parameter in self.part_parameters_for_archive(archive): + await self.inventree.upsert_part_parameter(part_id=part_id, **parameter) + + def part_parameters_for_archive(self, archive: Archive) -> list[dict[str, str | float | None]]: + parameters: list[dict[str, str | float | None]] = [] + + weight = archive.filament_used_grams or archive.filament_used + if weight is not None: + quantity = archive.quantity or 1 + if quantity <= 0: + quantity = 1 + weight_per_item = float(weight) / float(quantity) + parameters.append( + { + "template_name": "Weight", + "data": self.format_number(weight_per_item), + "data_numeric": weight_per_item, + "note": f"Imported from Bambuddy archive {archive.id}; total print weight {self.format_number(float(weight))} g", + } + ) + + duration = archive.actual_time_seconds or archive.print_time_seconds or archive.duration + if duration is not None: + duration_seconds = float(duration) + parameters.append( + { + "template_name": "PrintTime", + "data": self.format_duration(duration_seconds), + "data_numeric": duration_seconds, + "note": f"Imported from Bambuddy archive {archive.id}; value stored as seconds in data_numeric", + } + ) + + return parameters + def part_key_for_archive(self, archive: Archive) -> str: parts: list[str] = [] raw = archive.model_dump() @@ -270,6 +307,22 @@ class ArchiveSyncService: ] return "\n".join(row for row in rows if row) + @staticmethod + def format_number(value: float) -> str: + return f"{value:.3f}".rstrip("0").rstrip(".") + + @staticmethod + def format_duration(seconds: float) -> str: + total_seconds = int(round(seconds)) + hours, remainder = divmod(total_seconds, 3600) + minutes, secs = divmod(remainder, 60) + + if hours: + return f"{hours}h {minutes}m {secs}s" + if minutes: + return f"{minutes}m {secs}s" + return f"{secs}s" + @staticmethod def is_successful_archive(archive: Archive) -> bool: status = (archive.status or "").lower()