Files
Lab8DATAPROCESSOR/src/bambuddy_inventree_sync/inventree.py

173 lines
6.6 KiB
Python

from typing import Any
import httpx
from .config import Settings
from .http_errors import ExternalApiError
from .models import Archive
class InvenTreeClient:
def __init__(self, settings: Settings) -> None:
base_url = settings.inventree_base_url.rstrip("/")
if not base_url.endswith("/api"):
base_url = f"{base_url}/api"
self.settings = settings
self.base_url = base_url
self._parameter_template_cache: dict[str, int] = {}
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers={
"Authorization": f"Token {settings.inventree_token}",
"Accept": "application/json",
},
timeout=settings.http_timeout_seconds,
trust_env=False,
)
async def close(self) -> None:
await self.client.aclose()
async def get_part_category(self) -> dict[str, Any]:
return await self._request("GET", f"/part/category/{self.settings.inventree_part_category_id}/")
async def get_stock_location(self) -> dict[str, Any]:
return await self._request("GET", f"/stock/location/{self.settings.inventree_stock_location_id}/")
async def get_part(self, part_id: int) -> dict[str, Any]:
return await self._request("GET", f"/part/{part_id}/")
async def find_part_by_ipn(self, ipn: str) -> dict[str, Any] | None:
for params in ({"IPN": ipn, "limit": 100}, {"search": ipn, "limit": 100}):
data = await self._request("GET", "/part/", params=params)
for item in self._items(data):
if item.get("IPN") == ipn:
return item
return None
async def create_part(self, *, name: str, description: str, ipn: str) -> dict[str, Any]:
payload = {
"name": name[:100],
"description": description[:250],
"category": self.settings.inventree_part_category_id,
"IPN": ipn[:100],
"active": True,
"component": True,
"purchaseable": False,
"salable": False,
"assembly": False,
"trackable": False,
"virtual": False,
}
return await self._request("POST", "/part/", json=payload)
async def find_stock_by_batch(self, *, part_id: int, batch: str) -> dict[str, Any] | None:
data = await self._request("GET", "/stock/", params={"part": part_id, "batch": batch, "limit": 100})
for item in self._items(data):
if item.get("part") == part_id and item.get("batch") == batch:
return item
return None
async def create_stock_item(self, *, part_id: int, archive: Archive, notes: str) -> dict[str, Any]:
quantity = archive.quantity or self.settings.default_stock_quantity
payload: dict[str, Any] = {
"part": part_id,
"location": self.settings.inventree_stock_location_id,
"quantity": quantity,
"status": self.settings.inventree_stock_status,
"batch": self.batch_for_archive(archive.id),
"notes": notes,
}
return await self._request("POST", "/stock/", json=payload)
async def upload_part_image(self, *, part_id: int, content: bytes, filename: str, content_type: str) -> dict[str, Any]:
response = await self.client.patch(
f"/part/{part_id}/",
files={"image": (filename, content, content_type)},
)
if response.status_code >= 400:
body = response.text[:1000]
raise ExternalApiError(f"InvenTree PATCH /part/{part_id}/ image failed: HTTP {response.status_code}: {body}")
return response.json()
async def upsert_part_parameter(
self,
*,
part_id: int,
template_name: str,
data: str,
data_numeric: float | None,
note: str,
) -> dict[str, Any]:
template_id = await self.get_parameter_template_id(template_name)
existing = await self.find_part_parameter(part_id=part_id, template_id=template_id)
payload: dict[str, Any] = {
"template": template_id,
"model_type": "part.part",
"model_id": part_id,
"data": data[:500],
"data_numeric": data_numeric,
"note": note[:500],
}
if existing:
parameter_id = int(existing.get("pk") or existing.get("id"))
return await self._request("PATCH", f"/parameter/{parameter_id}/", json=payload)
return await self._request("POST", "/parameter/", json=payload)
async def get_parameter_template_id(self, name: str) -> int:
cache_key = name.lower()
if cache_key in self._parameter_template_cache:
return self._parameter_template_cache[cache_key]
data = await self._request("GET", "/parameter/template/", params={"search": name, "limit": 100})
for item in self._items(data):
if str(item.get("name", "")).lower() == cache_key:
template_id = int(item.get("pk") or item.get("id"))
self._parameter_template_cache[cache_key] = template_id
return template_id
raise ExternalApiError(f"InvenTree parameter template '{name}' was not found")
async def find_part_parameter(self, *, part_id: int, template_id: int) -> dict[str, Any] | None:
data = await self._request(
"GET",
"/parameter/",
params={
"model_type": "part.part",
"model_id": part_id,
"template": template_id,
"limit": 100,
},
)
for item in self._items(data):
if item.get("model_type") == "part.part" and item.get("model_id") == part_id and item.get("template") == template_id:
return item
return None
@staticmethod
def batch_for_archive(archive_id: int) -> str:
return f"bambuddy-{archive_id}"
async def _request(self, method: str, path: str, **kwargs: Any) -> Any:
response = await self.client.request(method, path, **kwargs)
if response.status_code >= 400:
body = response.text[:1000]
raise ExternalApiError(f"InvenTree {method} {path} failed: HTTP {response.status_code}: {body}")
if response.content:
return response.json()
return None
@staticmethod
def _items(data: Any) -> list[dict[str, Any]]:
if isinstance(data, list):
return data
if isinstance(data, dict):
results = data.get("results")
if isinstance(results, list):
return results
return []