diff --git a/serial_heartbeat.py b/serial_heartbeat.py index 8cc14eb..911948c 100644 --- a/serial_heartbeat.py +++ b/serial_heartbeat.py @@ -7,6 +7,8 @@ import argparse import json import logging import os +import socket +import subprocess import sys import time from typing import Generator, Optional @@ -121,8 +123,52 @@ def iter_json_objects(port: serial.Serial, systemd: Optional[SystemdIntegration] yield payload +def vpn_status(interface: str = "tun0") -> int: + """Return 1 if the interface exists (and is up when operstate is available), else 0.""" + try: + if interface in {name for _, name in socket.if_nameindex()}: + operstate_path = f"/sys/class/net/{interface}/operstate" + carrier_path = f"/sys/class/net/{interface}/carrier" + if os.path.exists(operstate_path): + try: + with open(operstate_path, "r", encoding="utf-8") as fh: + state = fh.read().strip().lower() + if state == "up": + return 1 + if state == "down": + return 0 + # Some tun devices report "unknown"; fall back to carrier/exists. + except OSError: + pass + if os.path.exists(carrier_path): + try: + with open(carrier_path, "r", encoding="utf-8") as fh: + return 1 if fh.read().strip() == "1" else 0 + except OSError: + pass + return 1 + except OSError: + logging.debug("Unable to determine VPN interface state.", exc_info=True) + return 0 + + +def app_status(service: str = "watcher_application.service") -> int: + """Return 1 if the systemd service is active, else 0.""" + try: + result = subprocess.run( + ["systemctl", "is-active", "--quiet", service], + check=False, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + return 1 if result.returncode == 0 else 0 + except (FileNotFoundError, OSError): + logging.debug("Unable to query systemd service state for %s.", service, exc_info=True) + return 0 + + def relay_heartbeat(port_name: str, baudrate: int, systemd: Optional[SystemdIntegration] = None) -> None: - """Read heartbeats from ``port_name`` and reply with {"hb": 2}.""" + """Read heartbeats from ``port_name`` and reply with {"hb": 2, "VPN": <0|1>, "APP": <0|1>}.""" try: if systemd: systemd.status(f"Opening serial port {port_name} @ {baudrate} baud") @@ -132,10 +178,12 @@ def relay_heartbeat(port_name: str, baudrate: int, systemd: Optional[SystemdInte systemd.ready(f"Listening on {port_name} @ {baudrate} baud") for message in iter_json_objects(ser, systemd): hb_value = message.get("hb") - logging.debug("Received payload %s", message) + cmd_value = message.get("cmd") + logging.info("Received payload %s", message) - if hb_value == 1: - ack = json.dumps({"hb": 2}, separators=(",", ":")).encode("utf-8") + b"\n" + if hb_value == 1 or cmd_value == "status": + ack_payload = {"hb": 2, "VPN": vpn_status(), "APP": app_status()} + ack = json.dumps(ack_payload, separators=(",", ":")).encode("utf-8") + b"\n" ser.write(ack) ser.flush() logging.info("Sent reply %s", ack.strip())