import asyncio import json import os import signal from datetime import datetime from fastapi import FastAPI from routers.op25_controller import create_op25_router from internal.logger import create_logger from internal.op25_config_utls import scan_local_library import paho.mqtt.client as mqtt import requests from internal.discord_radio import DiscordRadioBot # Initialize logging LOGGER = create_logger(__name__) # FastAPI App app = FastAPI(title="Radio Edge Supervisor") app.include_router(create_op25_router(), prefix="/op25") # Configuration NODE_ID = os.getenv("NODE_ID", "standalone-node") MQTT_BROKER = os.getenv("MQTT_BROKER", None) HTTP_SERVER_PROTOCOL = os.getenv("HTTP_SERVER_PROTOCOL", "http") HTTP_SERVER_ADDRESS = os.getenv("HTTP_SERVER_ADDRESS", "127.0.0.1") HTTP_SERVER_PORT = os.getenv("HTTP_SERVER_PORT", 8000) NODE_LAT = os.getenv("NODE_LAT") NODE_LONG = os.getenv("NODE_LONG") # Global flag to track MQTT connection state MQTT_CONNECTED = False # Initialize the Discord Bot discord_bot = DiscordRadioBot(listen_port=23457, forward_ports=[23456]) def handle_c2_command(topic, payload): """ Parses and routes commands received from the C2 server by calling the local supervisor's API. """ try: data = json.loads(payload) command_type = data.get("command") LOGGER.info(f"Received C2 Command: {command_type} on {topic}") # Base URL for the local supervisor API base_url = "http://localhost:8001/op25" if command_type == "start": response = requests.post(f"{base_url}/start") response.raise_for_status() LOGGER.info("Successfully executed 'start' command via API.") elif command_type == "stop": response = requests.post(f"{base_url}/stop") response.raise_for_status() LOGGER.info("Successfully executed 'stop' command via API.") elif command_type == "restart": LOGGER.info("Executing 'restart' command...") stop_response = requests.post(f"{base_url}/stop") stop_response.raise_for_status() time.sleep(2) # Give it a moment for services to die start_response = requests.post(f"{base_url}/start") start_response.raise_for_status() LOGGER.info("Successfully executed 'restart' command via API.") elif command_type in ["update", "set_active_config"]: config_payload = data.get("config") if not config_payload: LOGGER.error(f"Command '{command_type}' missing 'config' payload.") return elif command_type == "update": LOGGER.info("Updating local configuration...") # Placeholder: update_local_config(data.get("config")) restart = data.get("restart", True) response = requests.post(f"{base_url}/set_active_config?restart={restart}", json=config_payload) response.raise_for_status() LOGGER.info(f"Successfully executed '{command_type}' command via API.") elif command_type == "load_from_library": system_name = data.get("system_name") if not system_name: LOGGER.error("Command 'load_from_library' missing 'system_name' payload.") return response = requests.post(f"{base_url}/load_from_library?system_name={system_name}") response.raise_for_status() LOGGER.info(f"Successfully executed 'load_from_library' for {system_name} via API.") elif command_type == "tune": freq_mhz = data.get("system") if not freq_mhz: LOGGER.error("Command 'tune' missing 'frequency' payload.") return try: # OP25 terminal expects frequency in Hz freq_hz = int(float(freq_mhz) * 1_000_000) # The port is hardcoded as it's the default for the OP25 terminal op25_terminal_url = f"http://localhost:8081/tuning?chan=0&freq={freq_hz}" response = requests.get(op25_terminal_url, timeout=5) response.raise_for_status() LOGGER.info(f"Successfully sent tune command to OP25 terminal for {freq_mhz} MHz.") except ValueError: LOGGER.error(f"Invalid frequency format for tune command: {freq_mhz}") except requests.exceptions.RequestException as e: LOGGER.error(f"Failed to connect to OP25 terminal for tuning: {e}") elif command_type == "discord_join": token = data.get("token") channel_id = data.get("channel_id") if token and channel_id: asyncio.create_task(discord_bot.start_session(token, channel_id)) LOGGER.info("Initiating Discord Session...") elif command_type == "discord_leave": asyncio.create_task(discord_bot.stop_session()) LOGGER.info("Ending Discord Session...") else: LOGGER.warning(f"Unknown command type received: {command_type}") except json.JSONDecodeError: LOGGER.error(f"Failed to decode command payload: {payload}") except requests.exceptions.RequestException as e: LOGGER.error(f"Failed to call local API for command '{data.get('command')}': {e}") except Exception as e: LOGGER.error(f"Error processing C2 command: {e}") def get_current_stream_url(): """ Dynamically resolves the audio stream URL from the active OP25 configuration. Falls back to env var or default if config is missing/invalid. """ default_url = os.getenv("STREAM_URL", "http://127.0.0.1:8000/stream_0") config_path = "/configs/active.cfg.json" if not os.path.exists(config_path): return default_url try: with open(config_path, "r") as f: config = json.load(f) streams = config.get("metadata", {}).get("streams", []) if not streams: return default_url stream = streams[0] address = stream.get("icecastServerAddress", "127.0.0.1:8000") mount = stream.get("icecastMountpoint", "stream_0") if not mount.startswith("/"): mount = f"/{mount}" return f"http://{address}{mount}" except Exception as e: LOGGER.warning(f"Failed to resolve stream URL from config: {e}") return default_url async def mqtt_lifecycle_manager(): """ Manages the application-level logic: Check-in, Heartbeats, and Shutdown. Decoupled from the Paho MQTT network loop. """ global MQTT_CONNECTED if not MQTT_BROKER: LOGGER.info("No MQTT_BROKER defined. Running in standalone mode.") return client = mqtt.Client(client_id=NODE_ID) # --- Callbacks --- def on_connect(client, userdata, flags, rc): global MQTT_CONNECTED if rc == 0: LOGGER.info(f"Connected to MQTT Broker: {MQTT_BROKER}") client.subscribe(f"nodes/{NODE_ID}/commands") client.subscribe("nodes/discovery/request") MQTT_CONNECTED = True else: LOGGER.error(f"MQTT Connection failed: {rc}") def on_disconnect(client, userdata, rc): global MQTT_CONNECTED MQTT_CONNECTED = False if rc != 0: LOGGER.warning("Unexpected MQTT disconnection.") def publish_heartbeat(): """Helper to gather status and publish check-in.""" try: status_response = requests.get("http://localhost:8001/op25/status", timeout=2) op25_status = status_response.json() if status_response.ok else {} except Exception: op25_status = {"is_running": False} payload = { "node_id": NODE_ID, "status": "online", "timestamp": datetime.utcnow().isoformat(), "is_listening": op25_status.get("is_running", False), "active_system": op25_status.get("active_system"), # Only scan library if needed, otherwise it's heavy I/O "available_systems": scan_local_library(), "location": { "lat": str(NODE_LAT) if NODE_LAT else None, "long": str(NODE_LONG) if NODE_LONG else None } } client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(payload), retain=True) LOGGER.debug("Sent Heartbeat/Check-in") def on_message(client, userdata, msg): if msg.topic == "nodes/discovery/request": LOGGER.info("Received Discovery Request. Sending Heartbeat...") publish_heartbeat() else: handle_c2_command(msg.topic, msg.payload.decode()) # --- Setup --- client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect # LWT: Fires ONLY on ungraceful crash/timeout lwt_payload = json.dumps({"status": "offline", "reason": "unexpected_disconnect"}) client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True) async def metadata_watcher(): """ Polls OP25 HTTP terminal for metadata and publishes events to MQTT. Corrected to use the POST-based command API found in the HAR capture. """ last_tgid = 0 last_metadata = {} potential_end_time = None DEBOUNCE_SECONDS = 2.5 OP25_DATA_URL = "http://127.0.0.1:8081/" # This is the specific payload the OP25 web interface uses [cite: 45562, 45563] COMMAND_PAYLOAD = [{"command": "update", "arg1": 0, "arg2": 0}] # Audio Recording State recorder_proc = None current_call_id = None async def stop_recording(): nonlocal recorder_proc if recorder_proc: if recorder_proc.returncode is None: recorder_proc.terminate() try: await asyncio.wait_for(recorder_proc.wait(), timeout=2.0) except asyncio.TimeoutError: recorder_proc.kill() recorder_proc = None def upload_audio(call_id): if not MQTT_BROKER: return None local_path = f"/calls/{call_id}.mp3" if not os.path.exists(local_path): return None try: with open(local_path, "rb") as f: files = {"file": (f"{call_id}.mp3", f, "audio/mpeg")} response = requests.post(f"{HTTP_SERVER_PROTOCOL}://{HTTP_SERVER_ADDRESS}:{HTTP_SERVER_PORT}/upload", files=files, data={"node_id": NODE_ID, "call_id": call_id}, timeout=30) response.raise_for_status() return response.json().get("url") except Exception as e: LOGGER.error(f"Upload failed: {e}") return None finally: if os.path.exists(local_path): os.remove(local_path) while True: if not MQTT_CONNECTED: await asyncio.sleep(1) continue try: # Run blocking POST request in executor loop = asyncio.get_running_loop() response = await loop.run_in_executor( None, lambda: requests.post(OP25_DATA_URL, json=COMMAND_PAYLOAD, timeout=0.5) ) if response.status_code == 200: data = response.json() # LOGGER.debug(f"Response from OP25 API: {data}") current_tgid = 0 current_meta = {} # The response is an array of update objects for item in data: if item.get("json_type") == "channel_update": # The terminal provides channel info keyed by channel index (e.g., "0") # We look for the first channel that has an active TGID for key in item: if key.isdigit(): ch = item[key] t = ch.get("tgid") # OP25 returns null or 0 when no talkgroup is active if t and int(t) > 0: current_tgid = int(t) current_meta = { "tgid": str(t), "rid": str(ch.get("srcaddr", "")).strip(), "alpha_tag": str(ch.get("tag", "")).strip(), "frequency": str(ch.get("freq", 0)), "sysname": str(ch.get("system", "")).strip() } break if current_tgid: break now = datetime.utcnow() # Logic for handling call start/end events if current_tgid != 0: potential_end_time = None if current_tgid != last_tgid: if last_tgid != 0: # --- END PREVIOUS CALL --- await stop_recording() # Stop Discord Transmission if discord_bot.is_ready(): discord_bot.stop_transmission() audio_url = None if current_call_id: audio_url = await loop.run_in_executor(None, upload_audio, current_call_id) LOGGER.debug(f"Switching TGID: {last_tgid} -> {current_tgid}") payload = { "node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata, "audio_url": audio_url, "call_id": current_call_id } client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) # --- START NEW CALL --- LOGGER.debug(f"Call Start: TGID {current_tgid} ({current_meta.get('alpha_tag')})") # Trigger Discord Transmission if discord_bot.is_ready(): discord_bot.start_transmission() discord_bot.update_system_presence(current_meta.get('sysname', 'Scanning')) # Generate ID start_ts = int(now.timestamp()) sysname = current_meta.get('sysname', 'unknown') tgid = current_meta.get('tgid', '0') current_call_id = f"{NODE_ID}_{sysname}_{tgid}_{start_ts}" # Start Recording (FFmpeg) try: stream_url = get_current_stream_url() recorder_proc = await asyncio.create_subprocess_exec( "ffmpeg", "-i", stream_url, "-y", "-t", "300", f"/calls/{current_call_id}.mp3", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) except Exception as e: LOGGER.error(f"Failed to start recorder: {e}") payload = { "node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_start", "metadata": current_meta, "call_id": current_call_id } client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) last_tgid = current_tgid last_metadata = current_meta elif last_tgid != 0: if potential_end_time is None: LOGGER.debug(f"Signal lost for TGID {last_tgid}. Starting debounce.") potential_end_time = now elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS: # --- END CALL (Debounce Expired) --- await stop_recording() # Stop Discord Transmission if discord_bot.is_ready(): discord_bot.stop_transmission() audio_url = None if current_call_id: audio_url = await loop.run_in_executor(None, upload_audio, current_call_id) LOGGER.debug(f"Call End (Debounce expired): TGID {last_tgid}") payload = { "node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata, "audio_url": audio_url, "call_id": current_call_id } client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) last_tgid = 0 last_metadata = {} potential_end_time = None current_call_id = None else: LOGGER.debug(f"OP25 API returned status: {response.status_code}") except Exception as e: LOGGER.warning(f"Metadata watcher error: {e}") await asyncio.sleep(0.25) try: client.connect(MQTT_BROKER, 1883, 60) client.loop_start() # Run network loop in background thread # Start the metadata watcher task watcher_task = asyncio.create_task(metadata_watcher()) # --- Main Heartbeat Loop --- while True: if MQTT_CONNECTED: publish_heartbeat() # Pulse every 30 seconds # Only wait 30 sec if the HB sent. This way we don't stall a check-in await asyncio.sleep(30) else: await asyncio.sleep(5) except asyncio.CancelledError: LOGGER.info("Stopping MQTT Loop...") finally: # Cancel watcher if 'watcher_task' in locals(): watcher_task.cancel() try: await watcher_task except asyncio.CancelledError: pass # Graceful Shutdown: Explicitly tell C2 we are leaving if MQTT_CONNECTED: shutdown_payload = json.dumps({"status": "offline", "reason": "clean_shutdown"}) client.publish(f"nodes/{NODE_ID}/status", shutdown_payload, qos=1, retain=True) client.loop_stop() client.disconnect() @app.on_event("startup") async def startup_event(): # Store the task so we can cancel it if needed (optional) app.state.mqtt_task = asyncio.create_task(mqtt_lifecycle_manager()) @app.on_event("shutdown") async def shutdown_event(): # Cancel the loop to trigger the finally block if hasattr(app.state, "mqtt_task"): app.state.mqtt_task.cancel() try: await app.state.mqtt_task except asyncio.CancelledError: pass