Create Bambuddy assignments from InvenTree spools

This commit is contained in:
2026-04-15 20:38:26 +03:00
parent f861db762b
commit 130308a8d9
6 changed files with 279 additions and 2 deletions

View File

@@ -70,6 +70,10 @@ class BambuddyClient:
data = await self._request("PATCH", f"/inventory/spools/{spool_id}", json=payload)
return data if isinstance(data, dict) else {}
async def list_printers(self) -> list[dict[str, Any]]:
data = await self._request("GET", "/printers/")
return data if isinstance(data, list) else []
async def list_assignments(self, *, printer_id: int | None = None) -> list[dict[str, Any]]:
params: dict[str, Any] = {}
if printer_id is not None:
@@ -77,6 +81,16 @@ class BambuddyClient:
data = await self._request("GET", "/inventory/assignments", params=params)
return data if isinstance(data, list) else []
async def assign_spool(self, *, spool_id: int, printer_id: int, ams_id: int, tray_id: int) -> dict[str, Any]:
payload = {
"spool_id": spool_id,
"printer_id": printer_id,
"ams_id": ams_id,
"tray_id": tray_id,
}
data = await self._request("POST", "/inventory/assignments", json=payload)
return data if isinstance(data, dict) else {}
async def list_usage(self, *, limit: int, printer_id: int | None = None) -> list[dict[str, Any]]:
params: dict[str, Any] = {"limit": limit}
if printer_id is not None:

View File

@@ -44,9 +44,13 @@ class Settings(BaseSettings):
filament_printer_location_map: str = ""
filament_batch_source: str = "tag_uid"
filament_sync_spools: bool = True
filament_sync_assignments: bool = True
filament_sync_locations: bool = True
filament_sync_usage: bool = True
filament_return_unassigned_to_storage: bool = False
filament_printer_id_map: str = ""
filament_assignment_default_ams_id: Annotated[int, Field(ge=0)] = 0
filament_assignment_start_tray_id: Annotated[int, Field(ge=0)] = 0
filament_usage_limit: Annotated[int, Field(ge=1, le=1000)] = 200
filament_usage_success_statuses: str = "success,completed,complete,done"
filament_default_material: str = "PLA"
@@ -88,6 +92,19 @@ class Settings(BaseSettings):
result[key] = int(value)
return result
@property
def filament_printer_ids(self) -> dict[str, int]:
result: dict[str, int] = {}
for item in self.filament_printer_id_map.split(","):
if ":" not in item:
continue
key, value = item.split(":", 1)
key = key.strip().lower()
value = value.strip()
if key and value.isdigit():
result[key] = int(value)
return result
@property
def filament_loaded_location_ids(self) -> set[int]:
locations = set(self.filament_printer_locations.values())

View File

@@ -27,11 +27,12 @@ class FilamentTrackingService:
self._lock = asyncio.Lock()
async def status(self) -> dict[str, Any]:
stock_items, spools, assignments, usage = await asyncio.gather(
stock_items, spools, assignments, usage, printers = await asyncio.gather(
self._list_filament_stock(),
self.bambuddy.list_spools(),
self.bambuddy.list_assignments(),
self.bambuddy.list_usage(limit=self.settings.filament_usage_limit),
self.bambuddy.list_printers(),
)
stock_by_batch = self._index_stock_by_batch(stock_items)
@@ -65,8 +66,11 @@ class FilamentTrackingService:
"storage_location_id": self.settings.filament_storage_location_id,
"loaded_location_id": self.settings.filament_loaded_location_id,
"printer_locations": self.settings.filament_printer_locations,
"printer_ids": self.settings.filament_printer_ids,
"batch_source": self.settings.filament_batch_source,
"return_unassigned_to_storage": self.settings.filament_return_unassigned_to_storage,
"assignment_default_ams_id": self.settings.filament_assignment_default_ams_id,
"assignment_start_tray_id": self.settings.filament_assignment_start_tray_id,
},
"inventree": {
"stock_items": len(stock_items),
@@ -79,6 +83,7 @@ class FilamentTrackingService:
"spools_with_batch": len(spools_by_batch),
"assignments": len(assignments),
"usage_records": len(usage),
"printers": len(printers),
},
"mapping": {
"matched_batches": len(matched_batches),
@@ -91,6 +96,12 @@ class FilamentTrackingService:
"missing_in_bambuddy": missing_in_bambuddy[:50],
"missing_in_inventree": missing_in_inventree[:50],
"assignments": [self._assignment_summary(item, spools) for item in assignments[:50]],
"inventree_loaded_assignments": self._planned_assignment_summaries(
stock_items=stock_items,
spools_by_batch=spools_by_batch,
assignments=assignments,
printers=printers,
)[:50],
"pending_usage": pending_usage[:50],
},
"database": self.database.counts(),
@@ -107,6 +118,8 @@ class FilamentTrackingService:
if self.settings.filament_sync_spools:
result["spools"] = await self.sync_spools_from_inventree(dry_run=dry_run)
if self.settings.filament_sync_assignments:
result["assignments"] = await self.sync_assignments_from_inventree(dry_run=dry_run)
if self.settings.filament_sync_usage:
result["usage"] = await self.sync_usage_from_bambuddy(dry_run=dry_run)
if self.settings.filament_sync_locations:
@@ -168,6 +181,99 @@ class FilamentTrackingService:
return result
async def sync_assignments_from_inventree(self, *, dry_run: bool | None = None) -> dict[str, Any]:
if not self.settings.filament_tracking_enabled:
return {"enabled": False, "status": "disabled"}
dry = self._dry_run(dry_run)
async with self._lock:
stock_items, spools, assignments, printers = await asyncio.gather(
self._list_filament_stock(),
self.bambuddy.list_spools(),
self.bambuddy.list_assignments(),
self.bambuddy.list_printers(),
)
spools_by_batch = self._index_spools_by_batch(spools)
assigned_by_spool_id = {
int(assignment["spool_id"]): assignment
for assignment in assignments
if assignment.get("spool_id") is not None
}
assigned_by_slot = {
self._assignment_slot_key(assignment): assignment
for assignment in assignments
if self._assignment_slot_key(assignment) is not None
}
printer_ids = self._printer_ids_by_key(printers)
planned = self._planned_assignments(
stock_items=stock_items,
spools_by_batch=spools_by_batch,
printer_ids=printer_ids,
)
result: dict[str, Any] = {
"dry_run": dry,
"seen": len(planned),
"created": 0,
"unchanged": 0,
"skipped": 0,
"failed": 0,
"would_create": [],
"missing_printers": self._missing_printer_keys(printer_ids),
}
for item in planned:
spool_id = int(item["spool_id"])
slot_key = (int(item["printer_id"]), int(item["ams_id"]), int(item["tray_id"]))
existing_for_spool = assigned_by_spool_id.get(spool_id)
existing_for_slot = assigned_by_slot.get(slot_key)
if existing_for_spool:
if self._assignment_slot_key(existing_for_spool) == slot_key:
result["unchanged"] += 1
else:
result["skipped"] += 1
result.setdefault("skipped_items", []).append(
{**item, "reason": "spool_already_assigned_to_another_slot"}
)
continue
if existing_for_slot:
result["skipped"] += 1
result.setdefault("skipped_items", []).append(
{
**item,
"reason": "target_slot_already_occupied",
"current_spool_id": existing_for_slot.get("spool_id"),
}
)
continue
if dry:
result["would_create"].append(item)
continue
try:
created = await self.bambuddy.assign_spool(
spool_id=spool_id,
printer_id=int(item["printer_id"]),
ams_id=int(item["ams_id"]),
tray_id=int(item["tray_id"]),
)
result["created"] += 1
assigned_by_spool_id[spool_id] = created or {
"spool_id": spool_id,
"printer_id": item["printer_id"],
"ams_id": item["ams_id"],
"tray_id": item["tray_id"],
}
assigned_by_slot[slot_key] = assigned_by_spool_id[spool_id]
except Exception as exc:
logger.exception("Failed to assign Bambuddy spool %s to printer %s", spool_id, item["printer_key"])
result["failed"] += 1
result.setdefault("errors", []).append({**item, "error": str(exc)})
return result
async def sync_locations_from_assignments(self, *, dry_run: bool | None = None) -> dict[str, Any]:
if not self.settings.filament_tracking_enabled:
return {"enabled": False, "status": "disabled"}
@@ -486,6 +592,100 @@ class FilamentTrackingService:
by_pk[int(item.get("pk") or item.get("id"))] = item
return list(by_pk.values())
def _planned_assignment_summaries(
self,
*,
stock_items: list[dict[str, Any]],
spools_by_batch: dict[str, dict[str, Any]],
assignments: list[dict[str, Any]],
printers: list[dict[str, Any]],
) -> list[dict[str, Any]]:
printer_ids = self._printer_ids_by_key(printers)
existing_spool_ids = {
int(assignment["spool_id"])
for assignment in assignments
if assignment.get("spool_id") is not None
}
planned = self._planned_assignments(
stock_items=stock_items,
spools_by_batch=spools_by_batch,
printer_ids=printer_ids,
)
for item in planned:
item["already_assigned"] = int(item["spool_id"]) in existing_spool_ids
return planned
def _planned_assignments(
self,
*,
stock_items: list[dict[str, Any]],
spools_by_batch: dict[str, dict[str, Any]],
printer_ids: dict[str, int],
) -> list[dict[str, Any]]:
location_to_printer = {
location_id: printer_key
for printer_key, location_id in self.settings.filament_printer_locations.items()
}
by_printer: dict[str, list[dict[str, Any]]] = {}
for stock_item in stock_items:
location_id = self._stock_location(stock_item)
printer_key = location_to_printer.get(location_id)
if not printer_key:
continue
by_printer.setdefault(printer_key, []).append(stock_item)
planned: list[dict[str, Any]] = []
for printer_key, items in sorted(by_printer.items()):
printer_id = printer_ids.get(printer_key)
if printer_id is None:
continue
sorted_items = sorted(items, key=lambda item: int(item.get("pk") or item.get("id") or 0))
for index, stock_item in enumerate(sorted_items):
batch = self._stock_batch(stock_item)
spool = spools_by_batch.get(batch or "")
if not batch or not spool or spool.get("id") is None:
continue
planned.append(
{
"batch": batch,
"spool_id": int(spool["id"]),
"stock_item_id": int(stock_item.get("pk") or stock_item.get("id")),
"printer_key": printer_key,
"printer_id": printer_id,
"ams_id": self.settings.filament_assignment_default_ams_id,
"tray_id": self.settings.filament_assignment_start_tray_id + index,
"location_id": self._stock_location(stock_item),
"quantity": self._stock_quantity(stock_item),
}
)
return planned
def _printer_ids_by_key(self, printers: list[dict[str, Any]]) -> dict[str, int]:
result = dict(self.settings.filament_printer_ids)
for printer_key in self.settings.filament_printer_locations:
if printer_key in result:
continue
matched = self._find_printer_for_key(printer_key, printers)
if matched and matched.get("id") is not None:
result[printer_key] = int(matched["id"])
return result
def _missing_printer_keys(self, printer_ids: dict[str, int]) -> list[str]:
return sorted(key for key in self.settings.filament_printer_locations if key not in printer_ids)
@staticmethod
def _find_printer_for_key(printer_key: str, printers: list[dict[str, Any]]) -> dict[str, Any] | None:
normalized_key = printer_key.strip().lower()
for printer in printers:
name = str(printer.get("name") or "").strip().lower()
if name == normalized_key or name.startswith(normalized_key) or normalized_key.startswith(name):
return printer
return None
def _target_location_for_assignment(self, assignment: dict[str, Any]) -> int | None:
printer_locations = self.settings.filament_printer_locations
printer_name = str(assignment.get("printer_name") or "").strip().lower()
@@ -509,6 +709,15 @@ class FilamentTrackingService:
spool = next((item for item in spools if item.get("id") == spool_id), None)
return self._batch_from_spool(spool) if spool else None
@staticmethod
def _assignment_slot_key(assignment: dict[str, Any]) -> tuple[int, int, int] | None:
printer_id = assignment.get("printer_id")
ams_id = assignment.get("ams_id")
tray_id = assignment.get("tray_id")
if printer_id is None or ams_id is None or tray_id is None:
return None
return int(printer_id), int(ams_id), int(tray_id)
def _spool_payload_for_stock(self, stock_item: dict[str, Any]) -> dict[str, Any]:
part = stock_item.get("part_detail") or {}
part_name = str(part.get("full_name") or part.get("name") or f"InvenTree stock {stock_item.get('pk')}")

View File

@@ -206,6 +206,18 @@ async def sync_filament_spools(
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.post("/sync/filament/assignments", dependencies=[Depends(require_service_token)])
async def sync_filament_assignments(
request: Request,
dry_run: bool | None = Query(default=None),
) -> dict[str, Any]:
filament_service: FilamentTrackingService = request.app.state.filament_service
try:
return await filament_service.sync_assignments_from_inventree(dry_run=dry_run)
except ExternalApiError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.post("/sync/filament/locations", dependencies=[Depends(require_service_token)])
async def sync_filament_locations(
request: Request,