import asyncio import json import os import signal from datetime import datetime from fastapi import FastAPI from routers.op25_controller import create_op25_router from internal.logger import create_logger from internal.op25_config_utls import scan_local_library import paho.mqtt.client as mqtt import requests # Initialize logging LOGGER = create_logger(__name__) # FastAPI App app = FastAPI(title="Radio Edge Supervisor") app.include_router(create_op25_router(), prefix="/op25") # Configuration NODE_ID = os.getenv("NODE_ID", "standalone-node") MQTT_BROKER = os.getenv("MQTT_BROKER", None) # Global flag to track MQTT connection state MQTT_CONNECTED = False def handle_c2_command(topic, payload): """ Parses and routes commands received from the C2 server by calling the local supervisor's API. """ try: data = json.loads(payload) command_type = data.get("command") LOGGER.info(f"Received C2 Command: {command_type} on {topic}") # Base URL for the local supervisor API base_url = "http://localhost:8001/op25" if command_type == "start": response = requests.post(f"{base_url}/start") response.raise_for_status() LOGGER.info("Successfully executed 'start' command via API.") elif command_type == "stop": response = requests.post(f"{base_url}/stop") response.raise_for_status() LOGGER.info("Successfully executed 'stop' command via API.") elif command_type == "restart": LOGGER.info("Executing 'restart' command...") stop_response = requests.post(f"{base_url}/stop") stop_response.raise_for_status() time.sleep(2) # Give it a moment for services to die start_response = requests.post(f"{base_url}/start") start_response.raise_for_status() LOGGER.info("Successfully executed 'restart' command via API.") elif command_type in ["update", "set_active_config"]: config_payload = data.get("config") if not config_payload: LOGGER.error(f"Command '{command_type}' missing 'config' payload.") return elif command_type == "update": LOGGER.info("Updating local configuration...") # Placeholder: update_local_config(data.get("config")) restart = data.get("restart", True) response = requests.post(f"{base_url}/set_active_config?restart={restart}", json=config_payload) response.raise_for_status() LOGGER.info(f"Successfully executed '{command_type}' command via API.") elif command_type == "load_from_library": system_name = data.get("system_name") if not system_name: LOGGER.error("Command 'load_from_library' missing 'system_name' payload.") return response = requests.post(f"{base_url}/load_from_library?system_name={system_name}") response.raise_for_status() LOGGER.info(f"Successfully executed 'load_from_library' for {system_name} via API.") elif command_type == "tune": freq_mhz = data.get("system") if not freq_mhz: LOGGER.error("Command 'tune' missing 'frequency' payload.") return try: # OP25 terminal expects frequency in Hz freq_hz = int(float(freq_mhz) * 1_000_000) # The port is hardcoded as it's the default for the OP25 terminal op25_terminal_url = f"http://localhost:8081/tuning?chan=0&freq={freq_hz}" response = requests.get(op25_terminal_url, timeout=5) response.raise_for_status() LOGGER.info(f"Successfully sent tune command to OP25 terminal for {freq_mhz} MHz.") except ValueError: LOGGER.error(f"Invalid frequency format for tune command: {freq_mhz}") except requests.exceptions.RequestException as e: LOGGER.error(f"Failed to connect to OP25 terminal for tuning: {e}") else: LOGGER.warning(f"Unknown command type received: {command_type}") except json.JSONDecodeError: LOGGER.error(f"Failed to decode command payload: {payload}") except requests.exceptions.RequestException as e: LOGGER.error(f"Failed to call local API for command '{data.get('command')}': {e}") except Exception as e: LOGGER.error(f"Error processing C2 command: {e}") async def mqtt_lifecycle_manager(): """ Manages the application-level logic: Check-in, Heartbeats, and Shutdown. Decoupled from the Paho MQTT network loop. """ global MQTT_CONNECTED if not MQTT_BROKER: LOGGER.info("No MQTT_BROKER defined. Running in standalone mode.") return client = mqtt.Client(client_id=NODE_ID) # --- Callbacks --- def on_connect(client, userdata, flags, rc): global MQTT_CONNECTED if rc == 0: LOGGER.info(f"Connected to MQTT Broker: {MQTT_BROKER}") client.subscribe(f"nodes/{NODE_ID}/commands") MQTT_CONNECTED = True else: LOGGER.error(f"MQTT Connection failed: {rc}") def on_disconnect(client, userdata, rc): global MQTT_CONNECTED MQTT_CONNECTED = False if rc != 0: LOGGER.warning("Unexpected MQTT disconnection.") def on_message(client, userdata, msg): handle_c2_command(msg.topic, msg.payload.decode()) # --- Setup --- client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect # LWT: Fires ONLY on ungraceful crash/timeout lwt_payload = json.dumps({"status": "offline", "reason": "unexpected_disconnect"}) client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True) try: client.connect(MQTT_BROKER, 1883, 60) client.loop_start() # Run network loop in background thread # --- Main Heartbeat Loop --- while True: if MQTT_CONNECTED: # 1. Gather Data (This was previously in on_connect) try: status_response = requests.get("http://localhost:8001/op25/status", timeout=2) op25_status = status_response.json() if status_response.ok else {} except Exception: op25_status = {"is_running": False} payload = { "node_id": NODE_ID, "status": "online", "timestamp": datetime.now().isoformat(), "is_listening": op25_status.get("is_running", False), "active_system": op25_status.get("active_system"), # Only scan library if needed, otherwise it's heavy I/O "available_systems": scan_local_library() } # 2. Publish Heartbeat (Acts as both check-in and keep-alive) client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(payload), retain=True) LOGGER.debug("Sent Heartbeat/Check-in") # Pulse every 30 seconds await asyncio.sleep(30) except asyncio.CancelledError: LOGGER.info("Stopping MQTT Loop...") finally: # Graceful Shutdown: Explicitly tell C2 we are leaving if MQTT_CONNECTED: shutdown_payload = json.dumps({"status": "offline", "reason": "clean_shutdown"}) client.publish(f"nodes/{NODE_ID}/status", shutdown_payload, qos=1, retain=True) client.loop_stop() client.disconnect() @app.on_event("startup") async def startup_event(): # Store the task so we can cancel it if needed (optional) app.state.mqtt_task = asyncio.create_task(mqtt_lifecycle_manager()) @app.on_event("shutdown") async def shutdown_event(): # Cancel the loop to trigger the finally block if hasattr(app.state, "mqtt_task"): app.state.mqtt_task.cancel() try: await app.state.mqtt_task except asyncio.CancelledError: pass