Add filament spool tracking sync

This commit is contained in:
2026-04-15 15:16:47 +03:00
parent 791c2235c5
commit ec0a0bf9af
8 changed files with 1157 additions and 2 deletions

View File

@@ -58,6 +58,32 @@ class BambuddyClient:
data = await self._request("PATCH", f"/archives/{archive_id}", json={"external_url": external_url})
return Archive.model_validate(data)
async def list_spools(self, *, include_archived: bool = False) -> list[dict[str, Any]]:
data = await self._request("GET", "/inventory/spools", params={"include_archived": include_archived})
return data if isinstance(data, list) else []
async def create_spool(self, payload: dict[str, Any]) -> dict[str, Any]:
data = await self._request("POST", "/inventory/spools", json=payload)
return data if isinstance(data, dict) else {}
async def update_spool(self, spool_id: int, payload: dict[str, Any]) -> dict[str, Any]:
data = await self._request("PATCH", f"/inventory/spools/{spool_id}", json=payload)
return data if isinstance(data, dict) else {}
async def list_assignments(self, *, printer_id: int | None = None) -> list[dict[str, Any]]:
params: dict[str, Any] = {}
if printer_id is not None:
params["printer_id"] = printer_id
data = await self._request("GET", "/inventory/assignments", params=params)
return data if isinstance(data, list) else []
async def list_usage(self, *, limit: int, printer_id: int | None = None) -> list[dict[str, Any]]:
params: dict[str, Any] = {"limit": limit}
if printer_id is not None:
params["printer_id"] = printer_id
data = await self._request("GET", "/inventory/usage", params=params)
return data if isinstance(data, list) else []
async def list_archives(
self,
*,

View File

@@ -36,6 +36,23 @@ class Settings(BaseSettings):
http_timeout_seconds: Annotated[int, Field(ge=1)] = 30
data_dir: Path = Path("/data")
filament_tracking_enabled: bool = False
filament_dry_run: bool = True
filament_part_category_id: int | None = None
filament_storage_location_id: int | None = None
filament_loaded_location_id: int | None = None
filament_printer_location_map: str = ""
filament_batch_source: str = "tag_uid"
filament_sync_spools: bool = True
filament_sync_locations: bool = True
filament_sync_usage: bool = True
filament_return_unassigned_to_storage: bool = False
filament_usage_limit: Annotated[int, Field(ge=1, le=1000)] = 200
filament_usage_success_statuses: str = "success,completed,complete,done"
filament_default_material: str = "PLA"
filament_default_label_weight: Annotated[int, Field(ge=1)] = 1000
filament_default_core_weight: Annotated[int, Field(ge=0)] = 250
@field_validator("bambuddy_base_url", "inventree_base_url", "inventree_web_url")
@classmethod
def strip_url(cls, value: str | None) -> str | None:
@@ -58,6 +75,31 @@ class Settings(BaseSettings):
fields = [field.strip() for field in self.part_key_fields.split(",") if field.strip()]
return fields or ["filename", "name"]
@property
def filament_printer_locations(self) -> dict[str, int]:
result: dict[str, int] = {}
for item in self.filament_printer_location_map.split(","):
if ":" not in item:
continue
key, value = item.split(":", 1)
key = key.strip().lower()
value = value.strip()
if key and value.isdigit():
result[key] = int(value)
return result
@property
def filament_loaded_location_ids(self) -> set[int]:
locations = set(self.filament_printer_locations.values())
if self.filament_loaded_location_id is not None:
locations.add(self.filament_loaded_location_id)
return locations
@property
def filament_usage_success_status_names(self) -> set[str]:
statuses = [status.strip().lower() for status in self.filament_usage_success_statuses.split(",") if status.strip()]
return set(statuses) or {"success", "completed", "complete", "done"}
@property
def inventree_browser_url(self) -> str:
base_url = (self.inventree_web_url or self.inventree_base_url).rstrip("/")

View File

@@ -35,6 +35,20 @@ class Database:
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS filament_usage_records (
usage_id INTEGER PRIMARY KEY,
spool_id INTEGER,
batch TEXT,
stock_item_id INTEGER,
quantity REAL,
usage_status TEXT,
sync_status TEXT NOT NULL,
error TEXT,
raw_usage TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
"""
)
@@ -111,6 +125,60 @@ class Database:
(part_key, inventree_part_id, display_name),
)
def get_filament_usage_record(self, usage_id: int) -> dict[str, Any] | None:
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM filament_usage_records WHERE usage_id = ?",
(usage_id,),
).fetchone()
return dict(row) if row else None
def upsert_filament_usage_record(
self,
*,
usage_id: int,
sync_status: str,
spool_id: int | None = None,
batch: str | None = None,
stock_item_id: int | None = None,
quantity: float | None = None,
usage_status: str | None = None,
error: str | None = None,
raw_usage: dict[str, Any] | None = None,
) -> None:
raw_usage_json = json.dumps(raw_usage, ensure_ascii=False) if raw_usage is not None else None
with self._connect() as conn:
conn.execute(
"""
INSERT INTO filament_usage_records (
usage_id, spool_id, batch, stock_item_id, quantity,
usage_status, sync_status, error, raw_usage
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(usage_id) DO UPDATE SET
spool_id = excluded.spool_id,
batch = excluded.batch,
stock_item_id = excluded.stock_item_id,
quantity = excluded.quantity,
usage_status = excluded.usage_status,
sync_status = excluded.sync_status,
error = excluded.error,
raw_usage = COALESCE(excluded.raw_usage, filament_usage_records.raw_usage),
updated_at = CURRENT_TIMESTAMP
""",
(
usage_id,
spool_id,
batch,
stock_item_id,
quantity,
usage_status,
sync_status,
error,
raw_usage_json,
),
)
def counts(self) -> dict[str, int]:
with self._connect() as conn:
rows = conn.execute(
@@ -121,8 +189,17 @@ class Database:
"""
).fetchall()
parts = conn.execute("SELECT COUNT(*) AS count FROM part_map").fetchone()
filament_rows = conn.execute(
"""
SELECT sync_status, COUNT(*) AS count
FROM filament_usage_records
GROUP BY sync_status
"""
).fetchall()
result = {row["sync_status"]: int(row["count"]) for row in rows}
result["known_parts"] = int(parts["count"]) if parts else 0
for row in filament_rows:
result[f"filament_usage_{row['sync_status']}"] = int(row["count"])
return result
def _connect(self) -> sqlite3.Connection:

View File

@@ -0,0 +1,692 @@
import asyncio
import logging
import re
from typing import Any
from .bambuddy import BambuddyClient
from .config import Settings
from .database import Database
from .inventree import InvenTreeClient
logger = logging.getLogger(__name__)
class FilamentTrackingService:
def __init__(
self,
*,
settings: Settings,
database: Database,
bambuddy: BambuddyClient,
inventree: InvenTreeClient,
) -> None:
self.settings = settings
self.database = database
self.bambuddy = bambuddy
self.inventree = inventree
self._lock = asyncio.Lock()
async def status(self) -> dict[str, Any]:
stock_items, spools, assignments, usage = await asyncio.gather(
self._list_filament_stock(),
self.bambuddy.list_spools(),
self.bambuddy.list_assignments(),
self.bambuddy.list_usage(limit=self.settings.filament_usage_limit),
)
stock_by_batch = self._index_stock_by_batch(stock_items)
spools_by_batch = self._index_spools_by_batch(spools)
assignment_batches = {
batch
for batch in (self._batch_for_assignment(assignment, spools) for assignment in assignments)
if batch
}
synced_usage_ids = {
int(usage_record["id"])
for usage_record in usage
if usage_record.get("id") is not None
and (self.database.get_filament_usage_record(int(usage_record["id"])) or {}).get("sync_status") == "synced"
}
matched_batches = sorted(set(stock_by_batch).intersection(spools_by_batch))
missing_in_bambuddy = [self._stock_summary(item) for batch, item in sorted(stock_by_batch.items()) if batch not in spools_by_batch]
missing_in_inventree = [self._spool_summary(item) for batch, item in sorted(spools_by_batch.items()) if batch not in stock_by_batch]
pending_usage = [
self._usage_summary(item)
for item in usage
if item.get("id") is not None and int(item["id"]) not in synced_usage_ids
]
return {
"enabled": self.settings.filament_tracking_enabled,
"dry_run": self.settings.filament_dry_run,
"config": {
"part_category_id": self.settings.filament_part_category_id,
"storage_location_id": self.settings.filament_storage_location_id,
"loaded_location_id": self.settings.filament_loaded_location_id,
"printer_locations": self.settings.filament_printer_locations,
"batch_source": self.settings.filament_batch_source,
"return_unassigned_to_storage": self.settings.filament_return_unassigned_to_storage,
},
"inventree": {
"stock_items": len(stock_items),
"batches": len(stock_by_batch),
"storage_location_id": self.settings.filament_storage_location_id,
"loaded_location_id": self.settings.filament_loaded_location_id,
},
"bambuddy": {
"spools": len(spools),
"spools_with_batch": len(spools_by_batch),
"assignments": len(assignments),
"usage_records": len(usage),
},
"mapping": {
"matched_batches": len(matched_batches),
"missing_in_bambuddy": len(missing_in_bambuddy),
"missing_in_inventree": len(missing_in_inventree),
"assigned_batches": len(assignment_batches),
"pending_usage_records": len(pending_usage),
},
"samples": {
"missing_in_bambuddy": missing_in_bambuddy[:50],
"missing_in_inventree": missing_in_inventree[:50],
"assignments": [self._assignment_summary(item, spools) for item in assignments[:50]],
"pending_usage": pending_usage[:50],
},
"database": self.database.counts(),
}
async def sync_all(self, *, dry_run: bool | None = None) -> dict[str, Any]:
result: dict[str, Any] = {
"enabled": self.settings.filament_tracking_enabled,
"dry_run": self._dry_run(dry_run),
}
if not self.settings.filament_tracking_enabled:
result["status"] = "disabled"
return result
if self.settings.filament_sync_spools:
result["spools"] = await self.sync_spools_from_inventree(dry_run=dry_run)
if self.settings.filament_sync_usage:
result["usage"] = await self.sync_usage_from_bambuddy(dry_run=dry_run)
if self.settings.filament_sync_locations:
result["locations"] = await self.sync_locations_from_assignments(dry_run=dry_run)
return result
async def sync_spools_from_inventree(self, *, dry_run: bool | None = None) -> dict[str, Any]:
if not self.settings.filament_tracking_enabled:
return {"enabled": False, "status": "disabled"}
dry = self._dry_run(dry_run)
async with self._lock:
stock_items, spools = await asyncio.gather(self._list_filament_stock(), self.bambuddy.list_spools())
spools_by_batch = self._index_spools_by_batch(spools)
result: dict[str, Any] = {
"dry_run": dry,
"seen": len(stock_items),
"created": 0,
"updated": 0,
"unchanged": 0,
"skipped": 0,
"failed": 0,
"would_create": [],
"would_update": [],
}
for stock_item in stock_items:
batch = self._stock_batch(stock_item)
if not batch:
result["skipped"] += 1
continue
try:
payload = self._spool_payload_for_stock(stock_item)
existing = spools_by_batch.get(batch)
if not existing:
if dry:
result["would_create"].append({"batch": batch, "stock": self._stock_summary(stock_item), "payload": payload})
else:
await self.bambuddy.create_spool(payload)
result["created"] += 1
continue
patch = self._spool_update_payload(existing, payload)
if not patch:
result["unchanged"] += 1
continue
if dry:
result["would_update"].append({"batch": batch, "spool_id": existing.get("id"), "payload": patch})
else:
await self.bambuddy.update_spool(int(existing["id"]), patch)
result["updated"] += 1
except Exception as exc:
logger.exception("Failed to sync filament spool for batch %s", batch)
result["failed"] += 1
result.setdefault("errors", []).append({"batch": batch, "error": str(exc)})
return result
async def sync_locations_from_assignments(self, *, dry_run: bool | None = None) -> dict[str, Any]:
if not self.settings.filament_tracking_enabled:
return {"enabled": False, "status": "disabled"}
dry = self._dry_run(dry_run)
async with self._lock:
stock_items, spools, assignments = await asyncio.gather(
self._list_filament_stock(),
self.bambuddy.list_spools(),
self.bambuddy.list_assignments(),
)
stock_by_batch = self._index_stock_by_batch(stock_items)
spools_by_batch = self._index_spools_by_batch(spools)
assigned_batches: set[str] = set()
result: dict[str, Any] = {
"dry_run": dry,
"assignments": len(assignments),
"moved_to_printer": 0,
"moved_to_storage": 0,
"unchanged": 0,
"skipped": 0,
"failed": 0,
"would_move": [],
}
for assignment in assignments:
batch = self._batch_for_assignment(assignment, spools)
if not batch:
result["skipped"] += 1
continue
assigned_batches.add(batch)
stock_item = stock_by_batch.get(batch) or await self.inventree.find_stock_by_batch_code(
batch,
part_category_id=self.settings.filament_part_category_id,
)
target_location_id = self._target_location_for_assignment(assignment)
if not stock_item or target_location_id is None:
result["skipped"] += 1
continue
await self._move_stock_if_needed(
stock_item=stock_item,
target_location_id=target_location_id,
dry_run=dry,
result=result,
counter="moved_to_printer",
reason=f"Loaded in Bambuddy printer {assignment.get('printer_name') or assignment.get('printer_id')}",
)
storage_location_id = self.settings.filament_storage_location_id
if storage_location_id is not None and self.settings.filament_return_unassigned_to_storage:
for stock_item in stock_items:
batch = self._stock_batch(stock_item)
current_location = self._stock_location(stock_item)
if (
not batch
or batch not in spools_by_batch
or batch in assigned_batches
or current_location not in self.settings.filament_loaded_location_ids
):
continue
await self._move_stock_if_needed(
stock_item=stock_item,
target_location_id=storage_location_id,
dry_run=dry,
result=result,
counter="moved_to_storage",
reason="No active Bambuddy assignment",
)
return result
async def sync_usage_from_bambuddy(self, *, dry_run: bool | None = None, force: bool = False) -> dict[str, Any]:
if not self.settings.filament_tracking_enabled:
return {"enabled": False, "status": "disabled"}
dry = self._dry_run(dry_run)
async with self._lock:
usage_records, spools = await asyncio.gather(
self.bambuddy.list_usage(limit=self.settings.filament_usage_limit),
self.bambuddy.list_spools(include_archived=True),
)
spools_by_id = {int(spool["id"]): spool for spool in spools if spool.get("id") is not None}
result: dict[str, Any] = {
"dry_run": dry,
"seen": len(usage_records),
"removed": 0,
"already_synced": 0,
"skipped": 0,
"pending": 0,
"failed": 0,
"would_remove": [],
}
for usage in sorted(usage_records, key=lambda item: str(item.get("created_at") or "")):
usage_id = usage.get("id")
if usage_id is None:
result["skipped"] += 1
continue
existing = self.database.get_filament_usage_record(int(usage_id))
if existing and existing.get("sync_status") == "synced" and not force:
result["already_synced"] += 1
continue
try:
sync_result, detail = await self._sync_usage_record(usage, spools_by_id=spools_by_id, dry_run=dry)
if sync_result == "would_remove" and detail:
result["would_remove"].append(detail)
else:
result[sync_result] = result.get(sync_result, 0) + 1
except Exception as exc:
logger.exception("Failed to sync Bambuddy filament usage %s", usage_id)
result["failed"] += 1
if not dry:
self.database.upsert_filament_usage_record(
usage_id=int(usage_id),
spool_id=self._usage_spool_id(usage),
quantity=self._usage_weight(usage),
usage_status=str(usage.get("status") or ""),
sync_status="failed",
error=str(exc),
raw_usage=usage,
)
return result
async def _sync_usage_record(
self,
usage: dict[str, Any],
*,
spools_by_id: dict[int, dict[str, Any]],
dry_run: bool,
) -> tuple[str, dict[str, Any] | None]:
usage_id = int(usage["id"])
usage_status = str(usage.get("status") or "").lower()
if usage_status not in self.settings.filament_usage_success_status_names:
if not dry_run:
self.database.upsert_filament_usage_record(
usage_id=usage_id,
spool_id=self._usage_spool_id(usage),
quantity=self._usage_weight(usage),
usage_status=usage_status,
sync_status="skipped",
raw_usage=usage,
)
return "skipped", None
spool_id = self._usage_spool_id(usage)
spool = spools_by_id.get(spool_id) if spool_id is not None else None
batch = self._batch_from_spool(spool) if spool else None
quantity = self._usage_weight(usage)
if not spool or not batch or quantity is None or quantity <= 0:
if not dry_run:
self.database.upsert_filament_usage_record(
usage_id=usage_id,
spool_id=spool_id,
batch=batch,
quantity=quantity,
usage_status=usage_status,
sync_status="pending",
error="Usage record cannot be matched to an InvenTree batch",
raw_usage=usage,
)
return "pending", None
stock_item = await self.inventree.find_stock_by_batch_code(batch, part_category_id=self.settings.filament_part_category_id)
if not stock_item:
if not dry_run:
self.database.upsert_filament_usage_record(
usage_id=usage_id,
spool_id=spool_id,
batch=batch,
quantity=quantity,
usage_status=usage_status,
sync_status="pending",
error="No InvenTree stock item found for batch",
raw_usage=usage,
)
return "pending", None
stock_item_id = int(stock_item.get("pk") or stock_item.get("id"))
current_quantity = self._stock_quantity(stock_item)
remove_quantity = min(quantity, current_quantity)
if remove_quantity <= 0:
if not dry_run:
self.database.upsert_filament_usage_record(
usage_id=usage_id,
spool_id=spool_id,
batch=batch,
stock_item_id=stock_item_id,
quantity=quantity,
usage_status=usage_status,
sync_status="skipped",
error="InvenTree stock quantity is already zero",
raw_usage=usage,
)
return "skipped", None
notes = (
f"Bambuddy filament usage {usage_id}; "
f"spool {spool_id}; print {usage.get('print_name') or '-'}; "
f"reported {self._format_number(quantity)} g"
)
if remove_quantity < quantity:
notes += f"; capped to available {self._format_number(remove_quantity)} g"
if dry_run:
return "would_remove", {
"usage_id": usage_id,
"spool_id": spool_id,
"batch": batch,
"stock_item_id": stock_item_id,
"quantity": remove_quantity,
"reported_quantity": quantity,
"print_name": usage.get("print_name"),
}
await self.inventree.remove_stock_quantity(
stock_item_id=stock_item_id,
quantity=remove_quantity,
notes=notes,
)
self.database.upsert_filament_usage_record(
usage_id=usage_id,
spool_id=spool_id,
batch=batch,
stock_item_id=stock_item_id,
quantity=remove_quantity,
usage_status=usage_status,
sync_status="synced",
raw_usage=usage,
)
logger.info(
"Removed %s g from InvenTree filament batch %s using Bambuddy usage %s",
self._format_number(remove_quantity),
batch,
usage_id,
)
return "removed", None
async def _move_stock_if_needed(
self,
*,
stock_item: dict[str, Any],
target_location_id: int,
dry_run: bool,
result: dict[str, Any],
counter: str,
reason: str,
) -> None:
stock_item_id = int(stock_item.get("pk") or stock_item.get("id"))
current_location = self._stock_location(stock_item)
quantity = self._stock_quantity(stock_item)
batch = self._stock_batch(stock_item)
if current_location == target_location_id:
result["unchanged"] += 1
return
if quantity <= 0:
result["skipped"] += 1
return
move = {
"batch": batch,
"stock_item_id": stock_item_id,
"from_location_id": current_location,
"to_location_id": target_location_id,
"quantity": quantity,
"reason": reason,
}
if dry_run:
result["would_move"].append(move)
return
try:
await self.inventree.transfer_stock_item(
stock_item_id=stock_item_id,
location_id=target_location_id,
quantity=quantity,
notes=f"Bambuddy filament tracking: {reason}",
)
result[counter] += 1
logger.info("Moved filament batch %s stock item %s to location %s", batch, stock_item_id, target_location_id)
except Exception as exc:
logger.exception("Failed to move filament batch %s stock item %s", batch, stock_item_id)
result["failed"] += 1
result.setdefault("errors", []).append({"batch": batch, "stock_item_id": stock_item_id, "error": str(exc)})
async def _list_filament_stock(self) -> list[dict[str, Any]]:
params_base: dict[str, Any] = {}
if self.settings.filament_part_category_id is not None:
params_base["part_category"] = self.settings.filament_part_category_id
locations = [
location_id
for location_id in (self.settings.filament_storage_location_id, self.settings.filament_loaded_location_id)
if location_id is not None
]
by_pk: dict[int, dict[str, Any]] = {}
if locations:
for location_id in locations:
params = dict(params_base)
params["location"] = location_id
for item in await self.inventree.list_stock_items(params=params):
if bool(item.get("in_stock", True)):
by_pk[int(item.get("pk") or item.get("id"))] = item
return list(by_pk.values())
if params_base:
for item in await self.inventree.list_stock_items(params=params_base):
if bool(item.get("in_stock", True)):
by_pk[int(item.get("pk") or item.get("id"))] = item
return list(by_pk.values())
def _target_location_for_assignment(self, assignment: dict[str, Any]) -> int | None:
printer_locations = self.settings.filament_printer_locations
printer_name = str(assignment.get("printer_name") or "").strip().lower()
printer_id = str(assignment.get("printer_id") or "").strip().lower()
for key, location_id in printer_locations.items():
normalized_key = key.strip().lower()
if not normalized_key:
continue
if printer_name == normalized_key or printer_name.startswith(normalized_key):
return location_id
if printer_id == normalized_key:
return location_id
return self.settings.filament_loaded_location_id
def _batch_for_assignment(self, assignment: dict[str, Any], spools: list[dict[str, Any]]) -> str | None:
spool = assignment.get("spool")
if not spool:
spool_id = assignment.get("spool_id")
spool = next((item for item in spools if item.get("id") == spool_id), None)
return self._batch_from_spool(spool) if spool else None
def _spool_payload_for_stock(self, stock_item: dict[str, Any]) -> dict[str, Any]:
part = stock_item.get("part_detail") or {}
part_name = str(part.get("full_name") or part.get("name") or f"InvenTree stock {stock_item.get('pk')}")
material, color_name, brand = self._parse_part_name(part_name)
remaining = self._stock_quantity(stock_item)
label_weight = self._label_weight_for_stock(remaining)
weight_used = max(label_weight - remaining, 0)
batch = self._stock_batch(stock_item)
return {
"material": material[:50] or self.settings.filament_default_material,
"color_name": color_name,
"brand": brand,
"label_weight": label_weight,
"core_weight": self.settings.filament_default_core_weight,
"weight_used": round(weight_used, 3),
"slicer_filament": material[:50] or self.settings.filament_default_material,
"slicer_filament_name": part_name[:120],
"note": f"Synced from InvenTree stock item {stock_item.get('pk')}; batch {batch}",
"tag_uid": batch,
"data_origin": "inventree",
"tag_type": "batch",
"weight_locked": True,
}
def _spool_update_payload(self, spool: dict[str, Any], wanted: dict[str, Any]) -> dict[str, Any]:
patch: dict[str, Any] = {}
for field in (
"material",
"color_name",
"brand",
"label_weight",
"core_weight",
"weight_used",
"slicer_filament",
"slicer_filament_name",
"note",
"tag_uid",
"data_origin",
"tag_type",
"weight_locked",
):
current = spool.get(field)
expected = wanted.get(field)
if self._values_differ(current, expected):
patch[field] = expected
return patch
def _index_stock_by_batch(self, stock_items: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
result: dict[str, dict[str, Any]] = {}
for item in stock_items:
batch = self._stock_batch(item)
if batch and batch not in result:
result[batch] = item
return result
def _index_spools_by_batch(self, spools: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
result: dict[str, dict[str, Any]] = {}
for spool in spools:
batch = self._batch_from_spool(spool)
if batch and batch not in result:
result[batch] = spool
return result
def _batch_from_spool(self, spool: dict[str, Any] | None) -> str | None:
if not spool:
return None
source = self.settings.filament_batch_source.strip() or "tag_uid"
value = spool.get(source)
if value is None:
return None
batch = str(value).strip()
return batch or None
@staticmethod
def _stock_batch(stock_item: dict[str, Any]) -> str | None:
value = stock_item.get("batch")
if value is None:
return None
batch = str(value).strip()
return batch or None
@staticmethod
def _stock_location(stock_item: dict[str, Any]) -> int | None:
value = stock_item.get("location")
return int(value) if value is not None else None
@staticmethod
def _stock_quantity(stock_item: dict[str, Any]) -> float:
try:
return float(stock_item.get("quantity") or 0)
except (TypeError, ValueError):
return 0.0
@staticmethod
def _usage_spool_id(usage: dict[str, Any]) -> int | None:
value = usage.get("spool_id")
return int(value) if value is not None else None
@staticmethod
def _usage_weight(usage: dict[str, Any]) -> float | None:
value = usage.get("weight_used")
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _label_weight_for_stock(self, remaining: float) -> int:
default_weight = self.settings.filament_default_label_weight
if remaining <= default_weight:
return default_weight
return int(((remaining + 99) // 100) * 100)
def _parse_part_name(self, part_name: str) -> tuple[str, str | None, str | None]:
tokens = [token for token in re.split(r"[_\-\s]+", part_name.strip()) if token]
material = tokens[0] if tokens else self.settings.filament_default_material
brand = tokens[-1] if len(tokens) >= 3 else None
color_tokens = tokens[1:-1] if brand else tokens[1:]
color_name = " ".join(color_tokens).title() if color_tokens else None
return material[:50], color_name, brand
@staticmethod
def _values_differ(current: Any, expected: Any) -> bool:
if isinstance(expected, float):
try:
return abs(float(current or 0) - expected) > 0.001
except (TypeError, ValueError):
return True
return current != expected
def _dry_run(self, dry_run: bool | None) -> bool:
return self.settings.filament_dry_run if dry_run is None else dry_run
@staticmethod
def _format_number(value: float) -> str:
return f"{value:.3f}".rstrip("0").rstrip(".")
def _stock_summary(self, stock_item: dict[str, Any]) -> dict[str, Any]:
part = stock_item.get("part_detail") or {}
return {
"stock_item_id": stock_item.get("pk") or stock_item.get("id"),
"part_id": stock_item.get("part"),
"part_name": part.get("full_name") or part.get("name"),
"batch": self._stock_batch(stock_item),
"quantity": self._stock_quantity(stock_item),
"location_id": self._stock_location(stock_item),
}
def _spool_summary(self, spool: dict[str, Any]) -> dict[str, Any]:
return {
"spool_id": spool.get("id"),
"batch": self._batch_from_spool(spool),
"material": spool.get("material"),
"brand": spool.get("brand"),
"color_name": spool.get("color_name"),
"weight_used": spool.get("weight_used"),
}
def _assignment_summary(self, assignment: dict[str, Any], spools: list[dict[str, Any]]) -> dict[str, Any]:
return {
"assignment_id": assignment.get("id"),
"spool_id": assignment.get("spool_id"),
"batch": self._batch_for_assignment(assignment, spools),
"printer_id": assignment.get("printer_id"),
"printer_name": assignment.get("printer_name"),
"ams_id": assignment.get("ams_id"),
"tray_id": assignment.get("tray_id"),
"target_location_id": self._target_location_for_assignment(assignment),
}
def _usage_summary(self, usage: dict[str, Any]) -> dict[str, Any]:
return {
"usage_id": usage.get("id"),
"spool_id": usage.get("spool_id"),
"print_name": usage.get("print_name"),
"weight_used": usage.get("weight_used"),
"status": usage.get("status"),
"created_at": usage.get("created_at"),
}

View File

@@ -35,6 +35,9 @@ class InvenTreeClient:
async def get_stock_location(self) -> dict[str, Any]:
return await self._request("GET", f"/stock/location/{self.settings.inventree_stock_location_id}/")
async def get_stock_location_by_id(self, location_id: int) -> dict[str, Any]:
return await self._request("GET", f"/stock/location/{location_id}/")
async def get_part(self, part_id: int) -> dict[str, Any]:
return await self._request("GET", f"/part/{part_id}/")
@@ -69,6 +72,39 @@ class InvenTreeClient:
return item
return None
async def find_stock_by_batch_code(self, batch: str, *, part_category_id: int | None = None) -> dict[str, Any] | None:
params: dict[str, Any] = {"batch": batch, "limit": 100}
if part_category_id is not None:
params["part_category"] = part_category_id
data = await self._request("GET", "/stock/", params=params)
for item in self._items(data):
if str(item.get("batch") or "") == batch and bool(item.get("in_stock", True)):
return item
return None
async def list_stock_items(self, *, params: dict[str, Any] | None = None, limit: int = 100) -> list[dict[str, Any]]:
query = dict(params or {})
query["limit"] = limit
offset = int(query.get("offset") or 0)
items: list[dict[str, Any]] = []
while True:
query["offset"] = offset
data = await self._request("GET", "/stock/", params=query)
page_items = self._items(data)
items.extend(page_items)
if isinstance(data, dict):
total = data.get("count")
if total is not None and len(items) >= int(total):
break
if len(page_items) < limit:
break
offset += len(page_items)
return items
async def create_stock_item(self, *, part_id: int, archive: Archive, notes: str) -> dict[str, Any]:
quantity = archive.quantity or self.settings.default_stock_quantity
payload: dict[str, Any] = {
@@ -81,6 +117,36 @@ class InvenTreeClient:
}
return await self._request("POST", "/stock/", json=payload)
async def transfer_stock_item(
self,
*,
stock_item_id: int,
location_id: int,
quantity: float,
notes: str,
) -> dict[str, Any]:
payload = {
"items": [{"pk": stock_item_id, "quantity": quantity}],
"location": location_id,
"notes": notes,
}
data = await self._request("POST", "/stock/transfer/", json=payload)
return data if isinstance(data, dict) else {}
async def remove_stock_quantity(
self,
*,
stock_item_id: int,
quantity: float,
notes: str,
) -> dict[str, Any]:
payload = {
"items": [{"pk": stock_item_id, "quantity": quantity}],
"notes": notes,
}
data = await self._request("POST", "/stock/remove/", json=payload)
return data if isinstance(data, dict) else {}
async def upload_part_image(self, *, part_id: int, content: bytes, filename: str, content_type: str) -> dict[str, Any]:
response = await self.client.patch(
f"/part/{part_id}/",

View File

@@ -8,6 +8,7 @@ from fastapi import BackgroundTasks, Depends, FastAPI, Header, HTTPException, Qu
from .bambuddy import BambuddyClient
from .config import Settings, get_settings
from .database import Database
from .filament import FilamentTrackingService
from .http_errors import ExternalApiError
from .inventree import InvenTreeClient
from .models import BambuddyWebhook, SyncResult
@@ -48,12 +49,19 @@ async def lifespan(app: FastAPI):
bambuddy=bambuddy,
inventree=inventree,
)
filament_service = FilamentTrackingService(
settings=settings,
database=database,
bambuddy=bambuddy,
inventree=inventree,
)
app.state.settings = settings
app.state.database = database
app.state.bambuddy = bambuddy
app.state.inventree = inventree
app.state.sync_service = sync_service
app.state.filament_service = filament_service
stop_event = asyncio.Event()
poll_task: asyncio.Task[Any] | None = None
@@ -62,8 +70,9 @@ async def lifespan(app: FastAPI):
while not stop_event.is_set():
try:
await sync_service.backfill()
await filament_service.sync_all()
except Exception:
logger.exception("Scheduled backfill failed")
logger.exception("Scheduled sync failed")
try:
await asyncio.wait_for(stop_event.wait(), timeout=settings.poll_interval_seconds)
except TimeoutError:
@@ -71,6 +80,8 @@ async def lifespan(app: FastAPI):
if settings.sync_on_startup:
asyncio.create_task(sync_service.backfill())
if settings.filament_tracking_enabled:
asyncio.create_task(filament_service.sync_all())
if settings.poll_interval_seconds > 0:
poll_task = asyncio.create_task(poll_loop())
@@ -160,3 +171,61 @@ async def sync_backfill(
async def sync_status(request: Request) -> dict[str, int]:
database: Database = request.app.state.database
return database.counts()
@app.get("/filament/status", dependencies=[Depends(require_service_token)])
async def filament_status(request: Request) -> dict[str, Any]:
filament_service: FilamentTrackingService = request.app.state.filament_service
try:
return await filament_service.status()
except ExternalApiError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.post("/sync/filament", dependencies=[Depends(require_service_token)])
async def sync_filament(
request: Request,
dry_run: bool | None = Query(default=None),
) -> dict[str, Any]:
filament_service: FilamentTrackingService = request.app.state.filament_service
try:
return await filament_service.sync_all(dry_run=dry_run)
except ExternalApiError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.post("/sync/filament/spools", dependencies=[Depends(require_service_token)])
async def sync_filament_spools(
request: Request,
dry_run: bool | None = Query(default=None),
) -> dict[str, Any]:
filament_service: FilamentTrackingService = request.app.state.filament_service
try:
return await filament_service.sync_spools_from_inventree(dry_run=dry_run)
except ExternalApiError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.post("/sync/filament/locations", dependencies=[Depends(require_service_token)])
async def sync_filament_locations(
request: Request,
dry_run: bool | None = Query(default=None),
) -> dict[str, Any]:
filament_service: FilamentTrackingService = request.app.state.filament_service
try:
return await filament_service.sync_locations_from_assignments(dry_run=dry_run)
except ExternalApiError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.post("/sync/filament/usage", dependencies=[Depends(require_service_token)])
async def sync_filament_usage(
request: Request,
dry_run: bool | None = Query(default=None),
force: bool = Query(default=False),
) -> dict[str, Any]:
filament_service: FilamentTrackingService = request.app.state.filament_service
try:
return await filament_service.sync_usage_from_bambuddy(dry_run=dry_run, force=force)
except ExternalApiError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc