diff --git a/app/node_main.py b/app/node_main.py index 892df6d..f891d00 100644 --- a/app/node_main.py +++ b/app/node_main.py @@ -129,6 +129,7 @@ async def mqtt_lifecycle_manager(): if rc == 0: LOGGER.info(f"Connected to MQTT Broker: {MQTT_BROKER}") client.subscribe(f"nodes/{NODE_ID}/commands") + client.subscribe("nodes/discovery/request") MQTT_CONNECTED = True else: LOGGER.error(f"MQTT Connection failed: {rc}") @@ -139,8 +140,33 @@ async def mqtt_lifecycle_manager(): if rc != 0: LOGGER.warning("Unexpected MQTT disconnection.") + def publish_heartbeat(): + """Helper to gather status and publish check-in.""" + try: + status_response = requests.get("http://localhost:8001/op25/status", timeout=2) + op25_status = status_response.json() if status_response.ok else {} + except Exception: + op25_status = {"is_running": False} + + payload = { + "node_id": NODE_ID, + "status": "online", + "timestamp": datetime.now().isoformat(), + "is_listening": op25_status.get("is_running", False), + "active_system": op25_status.get("active_system"), + # Only scan library if needed, otherwise it's heavy I/O + "available_systems": scan_local_library() + } + + client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(payload), retain=True) + LOGGER.debug("Sent Heartbeat/Check-in") + def on_message(client, userdata, msg): - handle_c2_command(msg.topic, msg.payload.decode()) + if msg.topic == "nodes/discovery/request": + LOGGER.info("Received Discovery Request. Sending Heartbeat...") + publish_heartbeat() + else: + handle_c2_command(msg.topic, msg.payload.decode()) # --- Setup --- client.on_connect = on_connect @@ -158,29 +184,10 @@ async def mqtt_lifecycle_manager(): # --- Main Heartbeat Loop --- while True: if MQTT_CONNECTED: - # 1. Gather Data (This was previously in on_connect) - try: - status_response = requests.get("http://localhost:8001/op25/status", timeout=2) - op25_status = status_response.json() if status_response.ok else {} - except Exception: - op25_status = {"is_running": False} - - payload = { - "node_id": NODE_ID, - "status": "online", - "timestamp": datetime.now().isoformat(), - "is_listening": op25_status.get("is_running", False), - "active_system": op25_status.get("active_system"), - # Only scan library if needed, otherwise it's heavy I/O - "available_systems": scan_local_library() - } - - # 2. Publish Heartbeat (Acts as both check-in and keep-alive) - client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(payload), retain=True) - LOGGER.debug("Sent Heartbeat/Check-in") - - # Pulse every 30 seconds - await asyncio.sleep(30) + publish_heartbeat() + # Pulse every 30 seconds + # Only wait 30 sec if the HB sent. This way we don't stall a check-in + await asyncio.sleep(30) except asyncio.CancelledError: LOGGER.info("Stopping MQTT Loop...")