Add VPN and app status to heartbeat replies and log inbound messages

This commit is contained in:
2025-12-18 17:00:26 +02:00
parent f462fec86f
commit d2fa64ecbc

View File

@@ -7,6 +7,8 @@ import argparse
import json import json
import logging import logging
import os import os
import socket
import subprocess
import sys import sys
import time import time
from typing import Generator, Optional from typing import Generator, Optional
@@ -121,8 +123,52 @@ def iter_json_objects(port: serial.Serial, systemd: Optional[SystemdIntegration]
yield payload yield payload
def vpn_status(interface: str = "tun0") -> int:
"""Return 1 if the interface exists (and is up when operstate is available), else 0."""
try:
if interface in {name for _, name in socket.if_nameindex()}:
operstate_path = f"/sys/class/net/{interface}/operstate"
carrier_path = f"/sys/class/net/{interface}/carrier"
if os.path.exists(operstate_path):
try:
with open(operstate_path, "r", encoding="utf-8") as fh:
state = fh.read().strip().lower()
if state == "up":
return 1
if state == "down":
return 0
# Some tun devices report "unknown"; fall back to carrier/exists.
except OSError:
pass
if os.path.exists(carrier_path):
try:
with open(carrier_path, "r", encoding="utf-8") as fh:
return 1 if fh.read().strip() == "1" else 0
except OSError:
pass
return 1
except OSError:
logging.debug("Unable to determine VPN interface state.", exc_info=True)
return 0
def app_status(service: str = "watcher_application.service") -> int:
"""Return 1 if the systemd service is active, else 0."""
try:
result = subprocess.run(
["systemctl", "is-active", "--quiet", service],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return 1 if result.returncode == 0 else 0
except (FileNotFoundError, OSError):
logging.debug("Unable to query systemd service state for %s.", service, exc_info=True)
return 0
def relay_heartbeat(port_name: str, baudrate: int, systemd: Optional[SystemdIntegration] = None) -> None: def relay_heartbeat(port_name: str, baudrate: int, systemd: Optional[SystemdIntegration] = None) -> None:
"""Read heartbeats from ``port_name`` and reply with {"hb": 2}.""" """Read heartbeats from ``port_name`` and reply with {"hb": 2, "VPN": <0|1>, "APP": <0|1>}."""
try: try:
if systemd: if systemd:
systemd.status(f"Opening serial port {port_name} @ {baudrate} baud") systemd.status(f"Opening serial port {port_name} @ {baudrate} baud")
@@ -132,10 +178,12 @@ def relay_heartbeat(port_name: str, baudrate: int, systemd: Optional[SystemdInte
systemd.ready(f"Listening on {port_name} @ {baudrate} baud") systemd.ready(f"Listening on {port_name} @ {baudrate} baud")
for message in iter_json_objects(ser, systemd): for message in iter_json_objects(ser, systemd):
hb_value = message.get("hb") hb_value = message.get("hb")
logging.debug("Received payload %s", message) cmd_value = message.get("cmd")
logging.info("Received payload %s", message)
if hb_value == 1: if hb_value == 1 or cmd_value == "status":
ack = json.dumps({"hb": 2}, separators=(",", ":")).encode("utf-8") + b"\n" ack_payload = {"hb": 2, "VPN": vpn_status(), "APP": app_status()}
ack = json.dumps(ack_payload, separators=(",", ":")).encode("utf-8") + b"\n"
ser.write(ack) ser.write(ack)
ser.flush() ser.flush()
logging.info("Sent reply %s", ack.strip()) logging.info("Sent reply %s", ack.strip())