From b6a503a3e99ff8406809888d594af561cd83942c Mon Sep 17 00:00:00 2001 From: Logan Cusano Date: Sun, 28 Dec 2025 14:07:17 -0500 Subject: [PATCH] rename main to node_main for IDE clarity --- Dockerfile | 2 +- app/{main.py => node_main.py} | 121 ++++++++++++++++++++-------------- 2 files changed, 74 insertions(+), 49 deletions(-) rename app/{main.py => node_main.py} (64%) diff --git a/Dockerfile b/Dockerfile index eee0842..c933c07 100644 --- a/Dockerfile +++ b/Dockerfile @@ -49,4 +49,4 @@ RUN sed -i 's/\r$//' /usr/local/bin/docker-entrypoint.sh && \ ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"] # 3. Use CMD to pass the uvicorn command as arguments to the ENTRYPOINT script -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--reload"] \ No newline at end of file +CMD ["uvicorn", "node_main:app", "--host", "0.0.0.0", "--port", "8001", "--reload"] \ No newline at end of file diff --git a/app/main.py b/app/node_main.py similarity index 64% rename from app/main.py rename to app/node_main.py index da42374..892df6d 100644 --- a/app/main.py +++ b/app/node_main.py @@ -1,6 +1,7 @@ import asyncio import json import os +import signal from datetime import datetime from fastapi import FastAPI from routers.op25_controller import create_op25_router @@ -12,14 +13,17 @@ import requests # Initialize logging LOGGER = create_logger(__name__) +# FastAPI App app = FastAPI(title="Radio Edge Supervisor") - -# Add the router app.include_router(create_op25_router(), prefix="/op25") +# Configuration NODE_ID = os.getenv("NODE_ID", "standalone-node") MQTT_BROKER = os.getenv("MQTT_BROKER", None) +# Global flag to track MQTT connection state +MQTT_CONNECTED = False + def handle_c2_command(topic, payload): """ Parses and routes commands received from the C2 server by calling the @@ -106,79 +110,100 @@ def handle_c2_command(topic, payload): except Exception as e: LOGGER.error(f"Error processing C2 command: {e}") -async def mqtt_phone_home(): +async def mqtt_lifecycle_manager(): """ - Maintains a persistent C2 connection using a single MQTT client. - Handles check-ins and command subscriptions via callbacks. + Manages the application-level logic: Check-in, Heartbeats, and Shutdown. + Decoupled from the Paho MQTT network loop. """ + global MQTT_CONNECTED + if not MQTT_BROKER: LOGGER.info("No MQTT_BROKER defined. Running in standalone mode.") return - # Create a single client instance client = mqtt.Client(client_id=NODE_ID) + # --- Callbacks --- def on_connect(client, userdata, flags, rc): + global MQTT_CONNECTED if rc == 0: - LOGGER.info(f"Successfully connected to MQTT Broker: {MQTT_BROKER}") - - # 1. Subscribe to command topics for this specific node - command_topic = f"nodes/{NODE_ID}/commands" - client.subscribe(command_topic) - LOGGER.info(f"Subscribed to {command_topic}") - - # 2. Perform Initial Check-In with OP25 status - try: - status_response = requests.get("http://localhost:8001/op25/status") - op25_status = status_response.json() if status_response.ok else {} - except requests.RequestException: - op25_status = {"is_running": False, "active_system": None} - - checkin_data = { - "node_id": NODE_ID, - "status": "online", - "timestamp": datetime.now().isoformat(), - "version": "1.0.0", - "is_listening": op25_status.get("is_running", False), - "active_system": op25_status.get("active_system"), - "available_systems": scan_local_library() - } - client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(checkin_data), retain=True) + LOGGER.info(f"Connected to MQTT Broker: {MQTT_BROKER}") + client.subscribe(f"nodes/{NODE_ID}/commands") + MQTT_CONNECTED = True else: - LOGGER.error(f"MQTT Connection failed with return code {rc}") - - def on_message(client, userdata, msg): - # Handle messages arriving on subscribed topics - handle_c2_command(msg.topic, msg.payload.decode()) + LOGGER.error(f"MQTT Connection failed: {rc}") def on_disconnect(client, userdata, rc): + global MQTT_CONNECTED + MQTT_CONNECTED = False if rc != 0: - LOGGER.warning("Unexpected MQTT disconnection. Paho will attempt to reconnect...") + LOGGER.warning("Unexpected MQTT disconnection.") - # Set up callbacks and LWT + def on_message(client, userdata, msg): + handle_c2_command(msg.topic, msg.payload.decode()) + + # --- Setup --- client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect - - lwt_payload = json.dumps({"node_id": NODE_ID, "status": "offline"}) + + # LWT: Fires ONLY on ungraceful crash/timeout + lwt_payload = json.dumps({"status": "offline", "reason": "unexpected_disconnect"}) client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True) try: - # Connect and start the background loop thread - # loop_start() handles reconnections automatically without spaming new clients client.connect(MQTT_BROKER, 1883, 60) - client.loop_start() + client.loop_start() # Run network loop in background thread - # Keep the async task alive indefinitely + # --- Main Heartbeat Loop --- while True: - await asyncio.sleep(3600) + if MQTT_CONNECTED: + # 1. Gather Data (This was previously in on_connect) + try: + status_response = requests.get("http://localhost:8001/op25/status", timeout=2) + op25_status = status_response.json() if status_response.ok else {} + except Exception: + op25_status = {"is_running": False} + + payload = { + "node_id": NODE_ID, + "status": "online", + "timestamp": datetime.now().isoformat(), + "is_listening": op25_status.get("is_running", False), + "active_system": op25_status.get("active_system"), + # Only scan library if needed, otherwise it's heavy I/O + "available_systems": scan_local_library() + } + + # 2. Publish Heartbeat (Acts as both check-in and keep-alive) + client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(payload), retain=True) + LOGGER.debug("Sent Heartbeat/Check-in") - except Exception as e: - LOGGER.error(f"Fatal error in MQTT supervisor: {e}") + # Pulse every 30 seconds + await asyncio.sleep(30) + + except asyncio.CancelledError: + LOGGER.info("Stopping MQTT Loop...") finally: + # Graceful Shutdown: Explicitly tell C2 we are leaving + if MQTT_CONNECTED: + shutdown_payload = json.dumps({"status": "offline", "reason": "clean_shutdown"}) + client.publish(f"nodes/{NODE_ID}/status", shutdown_payload, qos=1, retain=True) + client.loop_stop() + client.disconnect() @app.on_event("startup") async def startup_event(): - # Start the C2 connection in the background. - asyncio.create_task(mqtt_phone_home()) \ No newline at end of file + # Store the task so we can cancel it if needed (optional) + app.state.mqtt_task = asyncio.create_task(mqtt_lifecycle_manager()) + +@app.on_event("shutdown") +async def shutdown_event(): + # Cancel the loop to trigger the finally block + if hasattr(app.state, "mqtt_task"): + app.state.mqtt_task.cancel() + try: + await app.state.mqtt_task + except asyncio.CancelledError: + pass