Files
watch-watch-server/serial_heartbeat.py

226 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""Simple heartbeat relay for a serial port."""
from __future__ import annotations
import argparse
import json
import logging
import os
import socket
import subprocess
import sys
import time
from typing import Generator, Optional
try:
import serial # type: ignore
except ImportError as exc: # pragma: no cover - fail fast when dependency missing
raise SystemExit(
"pyserial is required. Install dependencies via `pip install -r requirements.txt`."
) from exc
try:
from sdnotify import SystemdNotifier
except ImportError:
SystemdNotifier = None # type: ignore[assignment]
class SystemdIntegration:
"""Minimal helper around sdnotify so the service can run under systemd."""
def __init__(self) -> None:
self._notifier = self._init_notifier()
self._watchdog_interval = self._resolve_watchdog_interval()
self._last_watchdog = time.monotonic()
self._ready_sent = False
def _init_notifier(self) -> Optional["SystemdNotifier"]:
if "NOTIFY_SOCKET" not in os.environ:
return None
if SystemdNotifier is None:
logging.warning("sdnotify is not installed but systemd notification is requested.")
return None
try:
return SystemdNotifier()
except Exception as exc: # pragma: no cover - defensive
logging.warning("Unable to initialize systemd notifier: %s", exc)
return None
def _resolve_watchdog_interval(self) -> Optional[float]:
if not self._notifier:
return None
watchdog_usec = os.environ.get("WATCHDOG_USEC")
if not watchdog_usec:
return None
try:
interval_seconds = int(watchdog_usec) / 1_000_000
except (TypeError, ValueError):
logging.warning("Invalid WATCHDOG_USEC value: %s", watchdog_usec)
return None
# systemd expects to receive WATCHDOG=1 at least every WATCHDOG_USEC/2
return max(interval_seconds / 2, 0.5)
def status(self, message: str) -> None:
self._send(f"STATUS={message}")
def ready(self, message: str | None = None) -> None:
self._ready_sent = True
self._last_watchdog = time.monotonic()
self._send("READY=1", message)
def watchdog_ping(self) -> None:
if not self._ready_sent or not self._watchdog_interval or not self._notifier:
return
now = time.monotonic()
if now - self._last_watchdog >= self._watchdog_interval:
self._notifier.notify("WATCHDOG=1")
self._last_watchdog = now
def stopping(self, message: str | None = None) -> None:
if not self._ready_sent:
return
self._send("STOPPING=1", message)
def _send(self, primary: str, message: str | None = None) -> None:
if not self._notifier:
return
payload = primary
if message:
payload = f"{payload}\nSTATUS={message}"
self._notifier.notify(payload)
def iter_json_objects(port: serial.Serial, systemd: Optional[SystemdIntegration] = None) -> Generator[dict, None, None]:
"""Yield JSON objects that appear on the serial stream."""
decoder = json.JSONDecoder()
buffer = ""
while True:
chunk = port.read(port.in_waiting or 1)
if systemd:
systemd.watchdog_ping()
if not chunk:
continue
buffer += chunk.decode("utf-8", errors="ignore")
while True:
start = buffer.find("{")
if start == -1:
buffer = buffer[-1:]
break
if start:
buffer = buffer[start:]
try:
payload, idx = decoder.raw_decode(buffer)
except json.JSONDecodeError:
break
buffer = buffer[idx:]
if isinstance(payload, dict):
yield payload
def vpn_status(interface: str = "tun0") -> int:
"""Return 1 if the interface exists (and is up when operstate is available), else 0."""
try:
if interface in {name for _, name in socket.if_nameindex()}:
operstate_path = f"/sys/class/net/{interface}/operstate"
carrier_path = f"/sys/class/net/{interface}/carrier"
if os.path.exists(operstate_path):
try:
with open(operstate_path, "r", encoding="utf-8") as fh:
state = fh.read().strip().lower()
if state == "up":
return 1
if state == "down":
return 0
# Some tun devices report "unknown"; fall back to carrier/exists.
except OSError:
pass
if os.path.exists(carrier_path):
try:
with open(carrier_path, "r", encoding="utf-8") as fh:
return 1 if fh.read().strip() == "1" else 0
except OSError:
pass
return 1
except OSError:
logging.debug("Unable to determine VPN interface state.", exc_info=True)
return 0
def app_status(service: str = "watcher_application.service") -> int:
"""Return 1 if the systemd service is active, else 0."""
try:
result = subprocess.run(
["systemctl", "is-active", "--quiet", service],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return 1 if result.returncode == 0 else 0
except (FileNotFoundError, OSError):
logging.debug("Unable to query systemd service state for %s.", service, exc_info=True)
return 0
def relay_heartbeat(port_name: str, baudrate: int, systemd: Optional[SystemdIntegration] = None) -> None:
"""Read heartbeats from ``port_name`` and reply with {"hb": 2, "VPN": <0|1>, "APP": <0|1>}."""
try:
if systemd:
systemd.status(f"Opening serial port {port_name} @ {baudrate} baud")
with serial.Serial(port=port_name, baudrate=baudrate, timeout=1) as ser:
logging.info("Listening on %s @ %s baud", port_name, baudrate)
if systemd:
systemd.ready(f"Listening on {port_name} @ {baudrate} baud")
for message in iter_json_objects(ser, systemd):
hb_value = message.get("hb")
cmd_value = message.get("cmd")
logging.info("Received payload %s", message)
if hb_value == 1 or cmd_value == "status":
ack_payload = {"hb": 2, "VPN": vpn_status(), "APP": app_status()}
ack = json.dumps(ack_payload, separators=(",", ":")).encode("utf-8") + b"\n"
ser.write(ack)
ser.flush()
logging.info("Sent reply %s", ack.strip())
except serial.SerialException as exc:
logging.error("Serial communication error on %s: %s", port_name, exc)
raise
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Serial heartbeat responder.")
parser.add_argument("--port", required=True, help="Serial port path (e.g., /dev/ttyUSB0).")
parser.add_argument("--baudrate", type=int, default=9600, help="Serial baudrate.")
parser.add_argument("--log-level", default="INFO", help="Logging level (INFO, DEBUG ...).")
return parser.parse_args(argv)
def main(argv: list[str]) -> int:
args = parse_args(argv)
logging.basicConfig(level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(message)s")
systemd = SystemdIntegration()
systemd.status("Initializing serial heartbeat responder")
try:
relay_heartbeat(args.port, args.baudrate, systemd=systemd)
except serial.SerialException:
systemd.stopping("Serial communication failure.")
return 1
except KeyboardInterrupt:
logging.info("Interrupted by user, shutting down.")
systemd.stopping("Interrupted by user, shutting down.")
return 130
except Exception:
systemd.stopping("Unexpected failure, see logs.")
raise
systemd.stopping("Exiting cleanly.")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))