Files
Lab8DATAPROCESSOR/src/bambuddy_inventree_sync/sync.py

286 lines
12 KiB
Python

import asyncio
import hashlib
import logging
from pathlib import PurePosixPath, PureWindowsPath
from typing import Any
from .bambuddy import BambuddyClient
from .config import Settings
from .database import Database
from .http_errors import ExternalApiError
from .inventree import InvenTreeClient
from .models import Archive, SyncResult
logger = logging.getLogger(__name__)
class ArchiveSyncService:
def __init__(
self,
*,
settings: Settings,
database: Database,
bambuddy: BambuddyClient,
inventree: InvenTreeClient,
) -> None:
self.settings = settings
self.database = database
self.bambuddy = bambuddy
self.inventree = inventree
self._lock = asyncio.Lock()
async def validate_targets(self) -> dict[str, Any]:
category, location, bambuddy_info = await asyncio.gather(
self.inventree.get_part_category(),
self.inventree.get_stock_location(),
self.bambuddy.get_system_info(),
)
return {"part_category": category, "stock_location": location, "bambuddy": bambuddy_info}
async def sync_archive_id(
self,
archive_id: int,
*,
force: bool = False,
webhook_data: dict[str, Any] | None = None,
) -> SyncResult:
async with self._lock:
existing = self.database.get_record(archive_id)
if existing and existing["sync_status"] == "synced" and not force:
return SyncResult(
archive_id=archive_id,
status="already_synced",
message="Archive was already synced",
part_id=existing["part_id"],
stock_item_id=existing["stock_item_id"],
part_key=existing["part_key"],
)
try:
archive = await self._load_archive(archive_id, webhook_data)
return await self._sync_archive(archive)
except Exception as exc:
self.database.upsert_record(
archive_id=archive_id,
sync_status="failed",
error=str(exc),
raw_archive=webhook_data,
)
raise
async def backfill(self, *, status: str | None = None, max_archives: int | None = None) -> dict[str, int]:
status_filter = status
target_status = status.lower() if status else None
counts = {"seen": 0, "synced": 0, "already_synced": 0, "skipped": 0, "failed": 0}
offset = 0
limit = self.settings.backfill_page_size
while True:
total, archives = await self.bambuddy.list_archives(status=status_filter, limit=limit, offset=offset)
if not archives:
break
for archive in archives:
if target_status and (archive.status or "").lower() != target_status:
continue
if max_archives is not None and self._backfill_limit_reached(counts, max_archives):
return counts
counts["seen"] += 1
try:
result = await self.sync_archive_id(archive.id, webhook_data=archive.model_dump())
counts[result.status] = counts.get(result.status, 0) + 1
except Exception:
logger.exception("Failed to sync Bambuddy archive %s during backfill", archive.id)
counts["failed"] += 1
if max_archives is not None and self._backfill_limit_reached(counts, max_archives):
return counts
offset += len(archives)
if total is not None and offset >= total:
break
return counts
async def _load_archive(self, archive_id: int, webhook_data: dict[str, Any] | None) -> Archive:
try:
return await self.bambuddy.get_archive(archive_id)
except ExternalApiError:
if webhook_data:
data = dict(webhook_data)
data.setdefault("id", archive_id)
return Archive.model_validate(data)
raise
async def _sync_archive(self, archive: Archive) -> SyncResult:
if self.settings.sync_success_only and not self.is_successful_archive(archive):
self.database.upsert_record(
archive_id=archive.id,
sync_status="skipped",
archive_status=archive.status,
raw_archive=archive.model_dump(),
)
return SyncResult(archive_id=archive.id, status="skipped", message=f"Archive status is {archive.status}")
part_key = self.part_key_for_archive(archive)
ipn = self.ipn_for_part_key(part_key)
name = self.name_for_archive(archive)
description = self.description_for_archive(archive)
part_id = await self._get_or_create_part(part_key=part_key, ipn=ipn, name=name, description=description)
batch = self.inventree.batch_for_archive(archive.id)
existing_stock = await self.inventree.find_stock_by_batch(part_id=part_id, batch=batch)
if existing_stock:
stock_item_id = int(existing_stock.get("pk") or existing_stock.get("id"))
self.database.upsert_record(
archive_id=archive.id,
sync_status="synced",
part_key=part_key,
part_id=part_id,
stock_item_id=stock_item_id,
archive_status=archive.status,
raw_archive=archive.model_dump(),
)
return SyncResult(
archive_id=archive.id,
status="already_synced",
message="Stock item already exists in InvenTree",
part_id=part_id,
stock_item_id=stock_item_id,
part_key=part_key,
)
stock = await self.inventree.create_stock_item(
part_id=part_id,
archive=archive,
notes=self.stock_notes_for_archive(archive),
)
stock_item_id = int(stock.get("pk") or stock.get("id"))
self.database.upsert_record(
archive_id=archive.id,
sync_status="synced",
part_key=part_key,
part_id=part_id,
stock_item_id=stock_item_id,
archive_status=archive.status,
raw_archive=archive.model_dump(),
)
logger.info(
"Synced Bambuddy archive %s to InvenTree part %s stock item %s",
archive.id,
part_id,
stock_item_id,
)
return SyncResult(
archive_id=archive.id,
status="synced",
message="Created stock item in InvenTree",
part_id=part_id,
stock_item_id=stock_item_id,
part_key=part_key,
)
async def _get_or_create_part(self, *, part_key: str, ipn: str, name: str, description: str) -> int:
cached_part_id = self.database.get_part_id(part_key)
if cached_part_id:
return cached_part_id
existing = await self.inventree.find_part_by_ipn(ipn)
if existing:
part_id = int(existing.get("pk") or existing.get("id"))
self.database.upsert_part(part_key=part_key, inventree_part_id=part_id, display_name=name)
return part_id
created = await self.inventree.create_part(name=name, description=description, ipn=ipn)
part_id = int(created.get("pk") or created.get("id"))
self.database.upsert_part(part_key=part_key, inventree_part_id=part_id, display_name=name)
return part_id
def part_key_for_archive(self, archive: Archive) -> str:
parts: list[str] = []
raw = archive.model_dump()
for field in self.settings.part_key_field_names:
value = raw.get(field)
if value is not None and str(value).strip():
parts.append(str(value).strip().lower())
if not parts:
parts.append(str(archive.id))
source = "|".join(parts)
return hashlib.sha1(source.encode("utf-8")).hexdigest()
def ipn_for_part_key(self, part_key: str) -> str:
return f"{self.settings.part_ipn_prefix}-{part_key[:12]}"
@staticmethod
def name_for_archive(archive: Archive) -> str:
if archive.name and archive.name.strip():
return archive.name.strip()
if archive.print_name and archive.print_name.strip():
return archive.print_name.strip()
if archive.filename and archive.filename.strip():
filename = archive.filename.strip()
if "\\" in filename:
return PureWindowsPath(filename).stem or filename
return PurePosixPath(filename).stem or filename
return f"Bambuddy archive {archive.id}"
def description_for_archive(self, archive: Archive) -> str:
return "\n".join(
line
for line in [
"Auto-created from Bambuddy archive.",
f"Archive ID: {archive.id}",
f"Print name: {archive.print_name}" if archive.print_name else "",
f"Filename: {archive.filename}" if archive.filename else "",
f"Content hash: {archive.content_hash}" if archive.content_hash else "",
f"Printer: {archive.printer_name}" if archive.printer_name else "",
f"Created at: {archive.created_at}" if archive.created_at else "",
f"Completed at: {archive.completed_at}" if archive.completed_at else "",
]
if line
)
def stock_notes_for_archive(self, archive: Archive) -> str:
duration = archive.actual_time_seconds or archive.print_time_seconds or archive.duration
filament_used = archive.filament_used_grams or archive.filament_used
rows = [
"Imported from Bambuddy.",
f"Archive ID: {archive.id}",
f"Print name: {archive.print_name}" if archive.print_name else "",
f"Filename: {archive.filename}" if archive.filename else "",
f"Printer: {archive.printer_name}" if archive.printer_name else "",
f"Started at: {archive.started_at}" if archive.started_at else "",
f"Completed at: {archive.completed_at}" if archive.completed_at else "",
f"Created at: {archive.created_at}" if archive.created_at else "",
f"Duration: {duration} seconds" if duration is not None else "",
f"Quantity: {archive.quantity}" if archive.quantity is not None else "",
f"Object count: {archive.object_count}" if archive.object_count is not None else "",
f"Filament used: {filament_used} g" if filament_used is not None else "",
f"Filament type: {archive.filament_type}" if archive.filament_type else "",
f"Filament color: {archive.filament_color}" if archive.filament_color else "",
f"Cost: {archive.cost}" if archive.cost is not None else "",
f"Bambuddy status: {archive.status}" if archive.status else "",
]
return "\n".join(row for row in rows if row)
@staticmethod
def is_successful_archive(archive: Archive) -> bool:
status = (archive.status or "").lower()
if status in {"success", "completed", "complete", "done"}:
return True
return bool(archive.completed_at and status not in {"failed", "stopped", "printing", "running"})
@staticmethod
def _backfill_limit_reached(counts: dict[str, int], max_archives: int) -> bool:
return counts["synced"] + counts["already_synced"] + counts["failed"] >= max_archives