#!/usr/bin/env python3 """ PyQt6 desktop tool for configuring the RX/TX firmware over USB CDC. Protocol: line-delimited JSON. Expected commands (device-side to implement): {"cmd": "set_params", "params": {freq_mhz, bw_khz, sf, cr, tx_power_dbm, payload, period_ms, tx_enabled, band}} {"cmd": "get_status"} {"cmd": "reboot_bootloader"} The GUI sends updates automatically when fields change. """ from __future__ import annotations import json import sys import threading from dataclasses import dataclass from typing import Dict, Optional from PyQt6.QtCore import QTimer, pyqtSignal, QObject from PyQt6.QtWidgets import ( QApplication, QCheckBox, QComboBox, QGridLayout, QGroupBox, QHBoxLayout, QLabel, QLineEdit, QMainWindow, QPushButton, QSpinBox, QTextEdit, QVBoxLayout, QWidget, ) import serial from serial.tools import list_ports BANDS = [ ("430", 430), ("868", 868), ("915", 915), ("L" , 1550), # MHz midpoint ("S" , 2000), ("2.4G", 2442), ] @dataclass class DeviceParams: freq_mhz: int = 433 band: str = "430" bw_khz: int = 125 sf: int = 7 cr: int = 5 tx_power_dbm: int = 14 period_ms: int = 1000 tx_enabled: bool = True payload: str = "PING 1" def to_dict(self) -> Dict[str, object]: return { "freq_mhz": self.freq_mhz, "band": self.band, "bw_khz": self.bw_khz, "sf": self.sf, "cr": self.cr, "tx_power_dbm": self.tx_power_dbm, "period_ms": self.period_ms, "tx_enabled": self.tx_enabled, "payload": self.payload, } class DeviceConnection(QObject): status_changed = pyqtSignal(str) data_received = pyqtSignal(str) connected_changed = pyqtSignal(bool) def __init__(self) -> None: super().__init__() self._ser: Optional[serial.Serial] = None self._rx_thread: Optional[threading.Thread] = None self._stop = threading.Event() @staticmethod def available_ports(): return list_ports.comports() def connect(self, port: str, baud: int = 115200) -> None: self.disconnect() try: self._ser = serial.Serial(port=port, baudrate=baud, timeout=0.1) except serial.SerialException as exc: self.status_changed.emit(f"Connect failed: {exc}") self._ser = None return self._stop.clear() self._rx_thread = threading.Thread(target=self._rx_loop, daemon=True) self._rx_thread.start() self.status_changed.emit(f"Connected to {port} @ {baud}") self.connected_changed.emit(True) def disconnect(self) -> None: self._stop.set() if self._rx_thread and self._rx_thread.is_alive(): self._rx_thread.join(timeout=0.5) self._rx_thread = None if self._ser: try: self._ser.close() except Exception: pass was_connected = self._ser is not None self._ser = None if was_connected: self.connected_changed.emit(False) self.status_changed.emit("Disconnected") def is_connected(self) -> bool: return self._ser is not None and self._ser.is_open def send_json(self, obj: Dict[str, object]) -> None: if not self.is_connected(): self.status_changed.emit("Not connected") return try: line = json.dumps(obj) + "\n" self._ser.write(line.encode("utf-8")) except Exception as exc: self.status_changed.emit(f"Send failed: {exc}") def _rx_loop(self) -> None: assert self._ser is not None while not self._stop.is_set(): try: raw = self._ser.readline() except Exception as exc: self.status_changed.emit(f"Read error: {exc}") break if not raw: continue try: text = raw.decode("utf-8", errors="replace").strip() except Exception: continue if text: self.data_received.emit(text) self.connected_changed.emit(False) class MainWindow(QMainWindow): def __init__(self) -> None: super().__init__() self.setWindowTitle("LoRa RX/TX Config") self.conn = DeviceConnection() self.params = DeviceParams() self._tx_widgets = [] self._apply_timer = QTimer(self) self._apply_timer.setSingleShot(True) self._apply_timer.timeout.connect(self.send_params) self._build_ui() self._wire() def _build_ui(self) -> None: container = QWidget() main_layout = QVBoxLayout(container) # Connection controls conn_row = QHBoxLayout() self.port_combo = QComboBox() self.refresh_ports() self.btn_refresh = QPushButton("Refresh") self.btn_connect = QPushButton("Connect") conn_row.addWidget(QLabel("Port:")) conn_row.addWidget(self.port_combo, stretch=1) conn_row.addWidget(self.btn_refresh) conn_row.addWidget(self.btn_connect) main_layout.addLayout(conn_row) # Parameters params_group = QGroupBox("Radio parameters") grid = QGridLayout(params_group) self.spin_freq = QSpinBox() self.spin_freq.setRange(150, 2500) self.spin_freq.setSuffix(" MHz") self.spin_freq.setValue(self.params.freq_mhz) self.combo_band = QComboBox() for name, _ in BANDS: self.combo_band.addItem(name) self.combo_band.setCurrentText(self.params.band) self.combo_bw = QComboBox() for bw in (125, 250, 500): self.combo_bw.addItem(f"{bw} kHz", bw) self.combo_bw.setCurrentIndex(0) self.combo_sf = QComboBox() for sf in range(5, 13): self.combo_sf.addItem(f"SF{sf}", sf) self.combo_sf.setCurrentText("SF7") self.combo_cr = QComboBox() for cr in range(5, 9): self.combo_cr.addItem(f"4/{cr}", cr) self.combo_cr.setCurrentIndex(0) self.spin_power = QSpinBox() self.spin_power.setRange(-17, 22) self.spin_power.setSuffix(" dBm") self.spin_power.setValue(self.params.tx_power_dbm) self.spin_period = QSpinBox() self.spin_period.setRange(100, 60000) self.spin_period.setSingleStep(100) self.spin_period.setSuffix(" ms") self.spin_period.setValue(self.params.period_ms) self.chk_tx_enabled = QCheckBox("TX enabled") self.chk_tx_enabled.setChecked(self.params.tx_enabled) self.edit_payload = QLineEdit(self.params.payload) self.edit_payload.setMaxLength(31) grid.addWidget(QLabel("Band"), 0, 0) grid.addWidget(self.combo_band, 0, 1) grid.addWidget(QLabel("Frequency"), 1, 0) grid.addWidget(self.spin_freq, 1, 1) grid.addWidget(QLabel("Bandwidth"), 2, 0) grid.addWidget(self.combo_bw, 2, 1) grid.addWidget(QLabel("Spreading factor"), 3, 0) grid.addWidget(self.combo_sf, 3, 1) grid.addWidget(QLabel("Coding rate"), 4, 0) grid.addWidget(self.combo_cr, 4, 1) self.lbl_tx_power = QLabel("TX power") grid.addWidget(self.lbl_tx_power, 5, 0) grid.addWidget(self.spin_power, 5, 1) self.lbl_payload = QLabel("Payload") grid.addWidget(self.lbl_payload, 6, 0) grid.addWidget(self.edit_payload, 6, 1) self.lbl_period = QLabel("Period") grid.addWidget(self.lbl_period, 7, 0) grid.addWidget(self.spin_period, 7, 1) grid.addWidget(self.chk_tx_enabled, 8, 1) main_layout.addWidget(params_group) # Actions actions = QHBoxLayout() self.btn_boot = QPushButton("Bootloader") actions.addWidget(self.btn_boot) main_layout.addLayout(actions) # Status widgets for live metrics metrics_row = QHBoxLayout() self.lbl_role = QLabel("role: ?") self.lbl_snr = QLabel("SNR: -") self.lbl_rssi = QLabel("RSSI: -") self.lbl_status = QLabel("IRQ: -") metrics_row.addWidget(self.lbl_role) metrics_row.addWidget(self.lbl_snr) metrics_row.addWidget(self.lbl_rssi) metrics_row.addWidget(self.lbl_status) metrics_row.addStretch() main_layout.addLayout(metrics_row) self.lbl_rx_payload = QLabel("Payload: -") self.lbl_rx_payload.setWordWrap(True) main_layout.addWidget(self.lbl_rx_payload) self.log_view = QTextEdit() self.log_view.setReadOnly(True) self.log_view.setPlaceholderText("Incoming lines...") main_layout.addWidget(self.log_view) self.status_label = QLabel("Idle") main_layout.addWidget(self.status_label) self.setCentralWidget(container) self._tx_widgets = [ self.lbl_tx_power, self.spin_power, self.lbl_payload, self.edit_payload, self.lbl_period, self.spin_period, self.chk_tx_enabled, ] self._set_tx_controls_visible(True) def _wire(self) -> None: self.btn_refresh.clicked.connect(self.refresh_ports) self.btn_connect.clicked.connect(self.toggle_connect) self.btn_boot.clicked.connect(self.request_bootloader) for widget in ( self.combo_band, self.combo_bw, self.combo_sf, self.combo_cr, ): widget.currentIndexChanged.connect(self.schedule_apply) for widget in ( self.spin_freq, self.spin_power, self.spin_period, ): widget.valueChanged.connect(self.schedule_apply) self.chk_tx_enabled.stateChanged.connect(self.schedule_apply) self.edit_payload.textChanged.connect(self.schedule_apply) self.conn.status_changed.connect(self.append_status) self.conn.data_received.connect(self.append_rx) self.conn.connected_changed.connect(self.on_connected_changed) self.poll_timer = QTimer(self) self.poll_timer.timeout.connect(self.request_status) def refresh_ports(self) -> None: current = self.port_combo.currentText() self.port_combo.clear() for p in self.conn.available_ports(): self.port_combo.addItem(p.device) if current: idx = self.port_combo.findText(current) if idx >= 0: self.port_combo.setCurrentIndex(idx) def toggle_connect(self) -> None: if self.conn.is_connected(): self.conn.disconnect() return port = self.port_combo.currentText() if not port: self.append_status("Select a port first") return self.conn.connect(port) def on_connected_changed(self, connected: bool) -> None: self.btn_connect.setText("Disconnect" if connected else "Connect") if connected: self.send_params() self.poll_timer.start(1000) else: self.poll_timer.stop() self._set_tx_controls_visible(True) self.lbl_rx_payload.setText("Payload: -") def schedule_apply(self) -> None: # debounce rapid changes (spinner drags) self._apply_timer.start(150) def gather_params(self) -> DeviceParams: p = DeviceParams() p.band = self.combo_band.currentText() p.freq_mhz = self.spin_freq.value() p.bw_khz = int(self.combo_bw.currentData()) p.sf = int(self.combo_sf.currentData()) p.cr = int(self.combo_cr.currentData()) p.tx_power_dbm = self.spin_power.value() p.period_ms = self.spin_period.value() p.tx_enabled = self.chk_tx_enabled.isChecked() p.payload = self.edit_payload.text() return p def send_params(self) -> None: self.params = self.gather_params() self.conn.send_json({"cmd": "set_params", "params": self.params.to_dict()}) def request_status(self) -> None: self.conn.send_json({"cmd": "get_status"}) def request_bootloader(self) -> None: self.conn.send_json({"cmd": "reboot_bootloader"}) def append_status(self, text: str) -> None: self.status_label.setText(text) def append_rx(self, text: str) -> None: # Try to parse JSON status responses to populate labels try: obj = json.loads(text) except Exception: self.log_view.append(f"[rx] {text}") return if obj.get("resp") == "status": role = obj.get("role", "?") snr = obj.get("snr_db", None) rssi = obj.get("rssi_dbm", None) irq = obj.get("last_status", None) payload = obj.get("payload", None) self.lbl_role.setText(f"role: {role}") self.lbl_snr.setText(f"SNR: {snr} dB" if snr is not None else "SNR: -") self.lbl_rssi.setText(f"RSSI: {rssi} dBm" if rssi is not None else "RSSI: -") self.lbl_status.setText(f"IRQ: 0x{irq:02X}" if irq is not None else "IRQ: -") if payload is not None: self.lbl_rx_payload.setText(f"Payload: {payload}") else: self.lbl_rx_payload.setText("Payload: -") role_text = str(role).lower() is_rx = role_text.startswith("rx") or role_text == "receiver" self._set_tx_controls_visible(not is_rx) else: self.status_label.setText(text) def _set_tx_controls_visible(self, visible: bool) -> None: for widget in self._tx_widgets: widget.setVisible(visible) def main() -> int: app = QApplication(sys.argv) win = MainWindow() win.setFixedSize(520, 520) win.show() return app.exec() if __name__ == "__main__": raise SystemExit(main())