#!/usr/bin/env python3 """Simple heartbeat relay for a serial port.""" from __future__ import annotations import argparse import json import logging import os import sys import time from typing import Generator, Optional try: import serial # type: ignore except ImportError as exc: # pragma: no cover - fail fast when dependency missing raise SystemExit( "pyserial is required. Install dependencies via `pip install -r requirements.txt`." ) from exc try: from sdnotify import SystemdNotifier except ImportError: SystemdNotifier = None # type: ignore[assignment] class SystemdIntegration: """Minimal helper around sdnotify so the service can run under systemd.""" def __init__(self) -> None: self._notifier = self._init_notifier() self._watchdog_interval = self._resolve_watchdog_interval() self._last_watchdog = time.monotonic() self._ready_sent = False def _init_notifier(self) -> Optional["SystemdNotifier"]: if "NOTIFY_SOCKET" not in os.environ: return None if SystemdNotifier is None: logging.warning("sdnotify is not installed but systemd notification is requested.") return None try: return SystemdNotifier() except Exception as exc: # pragma: no cover - defensive logging.warning("Unable to initialize systemd notifier: %s", exc) return None def _resolve_watchdog_interval(self) -> Optional[float]: if not self._notifier: return None watchdog_usec = os.environ.get("WATCHDOG_USEC") if not watchdog_usec: return None try: interval_seconds = int(watchdog_usec) / 1_000_000 except (TypeError, ValueError): logging.warning("Invalid WATCHDOG_USEC value: %s", watchdog_usec) return None # systemd expects to receive WATCHDOG=1 at least every WATCHDOG_USEC/2 return max(interval_seconds / 2, 0.5) def status(self, message: str) -> None: self._send(f"STATUS={message}") def ready(self, message: str | None = None) -> None: self._ready_sent = True self._last_watchdog = time.monotonic() self._send("READY=1", message) def watchdog_ping(self) -> None: if not self._ready_sent or not self._watchdog_interval or not self._notifier: return now = time.monotonic() if now - self._last_watchdog >= self._watchdog_interval: self._notifier.notify("WATCHDOG=1") self._last_watchdog = now def stopping(self, message: str | None = None) -> None: if not self._ready_sent: return self._send("STOPPING=1", message) def _send(self, primary: str, message: str | None = None) -> None: if not self._notifier: return payload = primary if message: payload = f"{payload}\nSTATUS={message}" self._notifier.notify(payload) def iter_json_objects(port: serial.Serial, systemd: Optional[SystemdIntegration] = None) -> Generator[dict, None, None]: """Yield JSON objects that appear on the serial stream.""" decoder = json.JSONDecoder() buffer = "" while True: chunk = port.read(port.in_waiting or 1) if systemd: systemd.watchdog_ping() if not chunk: continue buffer += chunk.decode("utf-8", errors="ignore") while True: start = buffer.find("{") if start == -1: buffer = buffer[-1:] break if start: buffer = buffer[start:] try: payload, idx = decoder.raw_decode(buffer) except json.JSONDecodeError: break buffer = buffer[idx:] if isinstance(payload, dict): yield payload def relay_heartbeat(port_name: str, baudrate: int, systemd: Optional[SystemdIntegration] = None) -> None: """Read heartbeats from ``port_name`` and reply with {"hb": 2}.""" try: if systemd: systemd.status(f"Opening serial port {port_name} @ {baudrate} baud") with serial.Serial(port=port_name, baudrate=baudrate, timeout=1) as ser: logging.info("Listening on %s @ %s baud", port_name, baudrate) if systemd: systemd.ready(f"Listening on {port_name} @ {baudrate} baud") for message in iter_json_objects(ser, systemd): hb_value = message.get("hb") logging.debug("Received payload %s", message) if hb_value == 1: ack = json.dumps({"hb": 2}, separators=(",", ":")).encode("utf-8") + b"\n" ser.write(ack) ser.flush() logging.info("Sent reply %s", ack.strip()) except serial.SerialException as exc: logging.error("Serial communication error on %s: %s", port_name, exc) raise def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Serial heartbeat responder.") parser.add_argument("--port", required=True, help="Serial port path (e.g., /dev/ttyUSB0).") parser.add_argument("--baudrate", type=int, default=9600, help="Serial baudrate.") parser.add_argument("--log-level", default="INFO", help="Logging level (INFO, DEBUG ...).") return parser.parse_args(argv) def main(argv: list[str]) -> int: args = parse_args(argv) logging.basicConfig(level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(message)s") systemd = SystemdIntegration() systemd.status("Initializing serial heartbeat responder") try: relay_heartbeat(args.port, args.baudrate, systemd=systemd) except serial.SerialException: systemd.stopping("Serial communication failure.") return 1 except KeyboardInterrupt: logging.info("Interrupted by user, shutting down.") systemd.stopping("Interrupted by user, shutting down.") return 130 except Exception: systemd.stopping("Unexpected failure, see logs.") raise systemd.stopping("Exiting cleanly.") return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))