From 0d9feb26587275c064afe364b5845e959dbb562f Mon Sep 17 00:00:00 2001 From: Logan Cusano Date: Mon, 29 Dec 2025 00:34:28 -0500 Subject: [PATCH] first attempt --- app/node_main.py | 95 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/app/node_main.py b/app/node_main.py index d9cc663..765eff6 100644 --- a/app/node_main.py +++ b/app/node_main.py @@ -183,10 +183,95 @@ async def mqtt_lifecycle_manager(): lwt_payload = json.dumps({"status": "offline", "reason": "unexpected_disconnect"}) client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True) + async def metadata_watcher(): + """ + Polls OP25 HTTP terminal for metadata and publishes events to MQTT. + """ + last_tgid = 0 + last_metadata = {} + potential_end_time = None + DEBOUNCE_SECONDS = 2.5 + OP25_DATA_URL = "http://127.0.0.1:8081/data.json" + + while True: + if not MQTT_CONNECTED: + await asyncio.sleep(1) + continue + + try: + # Run blocking request in executor to avoid blocking the asyncio loop + loop = asyncio.get_running_loop() + response = await loop.run_in_executor(None, lambda: requests.get(OP25_DATA_URL, timeout=0.5)) + + if response.status_code == 200: + data = response.json() + + current_tgid = 0 + current_meta = {} + + # Handle multi_rx list or single dict structure + if isinstance(data, list): + for ch in data: + t = ch.get("tgid", 0) + if t and int(t) > 0: + current_tgid = int(t) + current_meta = { + "tgid": str(t), + "alpha_tag": str(ch.get("tag", "")).strip(), + "frequency": str(ch.get("freq", 0)), + "sysname": str(ch.get("system", "")).strip() + } + break + elif isinstance(data, dict): + t = data.get("tgid", 0) + if t and int(t) > 0: + current_tgid = int(t) + current_meta = { + "tgid": str(t), + "alpha_tag": str(data.get("tag", "")).strip(), + "frequency": str(data.get("freq", 0)), + "sysname": str(data.get("system", "")).strip() + } + + now = datetime.now() + + if current_tgid != 0: + potential_end_time = None # Reset debounce + + if current_tgid != last_tgid: + if last_tgid != 0: + # End previous call immediately if switching channels + payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata} + client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) + + # Start new call + payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_start", "metadata": current_meta} + client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) + last_tgid = current_tgid + last_metadata = current_meta + + elif last_tgid != 0: + if potential_end_time is None: + potential_end_time = now + elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS: + payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata} + client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) + last_tgid = 0 + last_metadata = {} + potential_end_time = None + + except Exception: + pass # OP25 might be restarting or busy + + await asyncio.sleep(0.25) + try: client.connect(MQTT_BROKER, 1883, 60) client.loop_start() # Run network loop in background thread + # Start the metadata watcher task + watcher_task = asyncio.create_task(metadata_watcher()) + # --- Main Heartbeat Loop --- while True: if MQTT_CONNECTED: @@ -194,10 +279,20 @@ async def mqtt_lifecycle_manager(): # Pulse every 30 seconds # Only wait 30 sec if the HB sent. This way we don't stall a check-in await asyncio.sleep(30) + else: + await asyncio.sleep(5) except asyncio.CancelledError: LOGGER.info("Stopping MQTT Loop...") finally: + # Cancel watcher + if 'watcher_task' in locals(): + watcher_task.cancel() + try: + await watcher_task + except asyncio.CancelledError: + pass + # Graceful Shutdown: Explicitly tell C2 we are leaving if MQTT_CONNECTED: shutdown_payload = json.dumps({"status": "offline", "reason": "clean_shutdown"})