226 lines
8.2 KiB
Python
226 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Simple heartbeat relay for a serial port."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from typing import Generator, Optional
|
|
|
|
try:
|
|
import serial # type: ignore
|
|
except ImportError as exc: # pragma: no cover - fail fast when dependency missing
|
|
raise SystemExit(
|
|
"pyserial is required. Install dependencies via `pip install -r requirements.txt`."
|
|
) from exc
|
|
|
|
try:
|
|
from sdnotify import SystemdNotifier
|
|
except ImportError:
|
|
SystemdNotifier = None # type: ignore[assignment]
|
|
|
|
|
|
class SystemdIntegration:
|
|
"""Minimal helper around sdnotify so the service can run under systemd."""
|
|
|
|
def __init__(self) -> None:
|
|
self._notifier = self._init_notifier()
|
|
self._watchdog_interval = self._resolve_watchdog_interval()
|
|
self._last_watchdog = time.monotonic()
|
|
self._ready_sent = False
|
|
|
|
def _init_notifier(self) -> Optional["SystemdNotifier"]:
|
|
if "NOTIFY_SOCKET" not in os.environ:
|
|
return None
|
|
if SystemdNotifier is None:
|
|
logging.warning("sdnotify is not installed but systemd notification is requested.")
|
|
return None
|
|
try:
|
|
return SystemdNotifier()
|
|
except Exception as exc: # pragma: no cover - defensive
|
|
logging.warning("Unable to initialize systemd notifier: %s", exc)
|
|
return None
|
|
|
|
def _resolve_watchdog_interval(self) -> Optional[float]:
|
|
if not self._notifier:
|
|
return None
|
|
watchdog_usec = os.environ.get("WATCHDOG_USEC")
|
|
if not watchdog_usec:
|
|
return None
|
|
try:
|
|
interval_seconds = int(watchdog_usec) / 1_000_000
|
|
except (TypeError, ValueError):
|
|
logging.warning("Invalid WATCHDOG_USEC value: %s", watchdog_usec)
|
|
return None
|
|
# systemd expects to receive WATCHDOG=1 at least every WATCHDOG_USEC/2
|
|
return max(interval_seconds / 2, 0.5)
|
|
|
|
def status(self, message: str) -> None:
|
|
self._send(f"STATUS={message}")
|
|
|
|
def ready(self, message: str | None = None) -> None:
|
|
self._ready_sent = True
|
|
self._last_watchdog = time.monotonic()
|
|
self._send("READY=1", message)
|
|
|
|
def watchdog_ping(self) -> None:
|
|
if not self._ready_sent or not self._watchdog_interval or not self._notifier:
|
|
return
|
|
now = time.monotonic()
|
|
if now - self._last_watchdog >= self._watchdog_interval:
|
|
self._notifier.notify("WATCHDOG=1")
|
|
self._last_watchdog = now
|
|
|
|
def stopping(self, message: str | None = None) -> None:
|
|
if not self._ready_sent:
|
|
return
|
|
self._send("STOPPING=1", message)
|
|
|
|
def _send(self, primary: str, message: str | None = None) -> None:
|
|
if not self._notifier:
|
|
return
|
|
payload = primary
|
|
if message:
|
|
payload = f"{payload}\nSTATUS={message}"
|
|
self._notifier.notify(payload)
|
|
|
|
|
|
def iter_json_objects(port: serial.Serial, systemd: Optional[SystemdIntegration] = None) -> Generator[dict, None, None]:
|
|
"""Yield JSON objects that appear on the serial stream."""
|
|
decoder = json.JSONDecoder()
|
|
buffer = ""
|
|
|
|
while True:
|
|
chunk = port.read(port.in_waiting or 1)
|
|
if systemd:
|
|
systemd.watchdog_ping()
|
|
if not chunk:
|
|
continue
|
|
|
|
buffer += chunk.decode("utf-8", errors="ignore")
|
|
|
|
while True:
|
|
start = buffer.find("{")
|
|
if start == -1:
|
|
buffer = buffer[-1:]
|
|
break
|
|
if start:
|
|
buffer = buffer[start:]
|
|
|
|
try:
|
|
payload, idx = decoder.raw_decode(buffer)
|
|
except json.JSONDecodeError:
|
|
break
|
|
|
|
buffer = buffer[idx:]
|
|
if isinstance(payload, dict):
|
|
yield payload
|
|
|
|
|
|
def vpn_status(interface: str = "tun0") -> int:
|
|
"""Return 1 if the interface exists (and is up when operstate is available), else 0."""
|
|
try:
|
|
if interface in {name for _, name in socket.if_nameindex()}:
|
|
operstate_path = f"/sys/class/net/{interface}/operstate"
|
|
carrier_path = f"/sys/class/net/{interface}/carrier"
|
|
if os.path.exists(operstate_path):
|
|
try:
|
|
with open(operstate_path, "r", encoding="utf-8") as fh:
|
|
state = fh.read().strip().lower()
|
|
if state == "up":
|
|
return 1
|
|
if state == "down":
|
|
return 0
|
|
# Some tun devices report "unknown"; fall back to carrier/exists.
|
|
except OSError:
|
|
pass
|
|
if os.path.exists(carrier_path):
|
|
try:
|
|
with open(carrier_path, "r", encoding="utf-8") as fh:
|
|
return 1 if fh.read().strip() == "1" else 0
|
|
except OSError:
|
|
pass
|
|
return 1
|
|
except OSError:
|
|
logging.debug("Unable to determine VPN interface state.", exc_info=True)
|
|
return 0
|
|
|
|
|
|
def app_status(service: str = "watcher_application.service") -> int:
|
|
"""Return 1 if the systemd service is active, else 0."""
|
|
try:
|
|
result = subprocess.run(
|
|
["systemctl", "is-active", "--quiet", service],
|
|
check=False,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
return 1 if result.returncode == 0 else 0
|
|
except (FileNotFoundError, OSError):
|
|
logging.debug("Unable to query systemd service state for %s.", service, exc_info=True)
|
|
return 0
|
|
|
|
|
|
def relay_heartbeat(port_name: str, baudrate: int, systemd: Optional[SystemdIntegration] = None) -> None:
|
|
"""Read heartbeats from ``port_name`` and reply with {"hb": 2, "VPN": <0|1>, "APP": <0|1>}."""
|
|
try:
|
|
if systemd:
|
|
systemd.status(f"Opening serial port {port_name} @ {baudrate} baud")
|
|
with serial.Serial(port=port_name, baudrate=baudrate, timeout=1) as ser:
|
|
logging.info("Listening on %s @ %s baud", port_name, baudrate)
|
|
if systemd:
|
|
systemd.ready(f"Listening on {port_name} @ {baudrate} baud")
|
|
for message in iter_json_objects(ser, systemd):
|
|
hb_value = message.get("hb")
|
|
cmd_value = message.get("cmd")
|
|
logging.info("Received payload %s", message)
|
|
|
|
if hb_value == 1 or cmd_value == "status":
|
|
ack_payload = {"hb": 2, "VPN": vpn_status(), "APP": app_status()}
|
|
ack = json.dumps(ack_payload, separators=(",", ":")).encode("utf-8") + b"\n"
|
|
ser.write(ack)
|
|
ser.flush()
|
|
logging.info("Sent reply %s", ack.strip())
|
|
except serial.SerialException as exc:
|
|
logging.error("Serial communication error on %s: %s", port_name, exc)
|
|
raise
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Serial heartbeat responder.")
|
|
parser.add_argument("--port", required=True, help="Serial port path (e.g., /dev/ttyUSB0).")
|
|
parser.add_argument("--baudrate", type=int, default=9600, help="Serial baudrate.")
|
|
parser.add_argument("--log-level", default="INFO", help="Logging level (INFO, DEBUG ...).")
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
args = parse_args(argv)
|
|
logging.basicConfig(level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(message)s")
|
|
systemd = SystemdIntegration()
|
|
systemd.status("Initializing serial heartbeat responder")
|
|
try:
|
|
relay_heartbeat(args.port, args.baudrate, systemd=systemd)
|
|
except serial.SerialException:
|
|
systemd.stopping("Serial communication failure.")
|
|
return 1
|
|
except KeyboardInterrupt:
|
|
logging.info("Interrupted by user, shutting down.")
|
|
systemd.stopping("Interrupted by user, shutting down.")
|
|
return 130
|
|
except Exception:
|
|
systemd.stopping("Unexpected failure, see logs.")
|
|
raise
|
|
systemd.stopping("Exiting cleanly.")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|