commit 26e90f4584cf6eee98297ffe95921ff6c9d6f2f8 Author: Logan Cusano Date: Sun Dec 28 02:37:50 2025 -0500 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..558e6b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.env +*.log +*.db +*.conf +config/* \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..eee0842 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,52 @@ +## OP25 Core Container +FROM python:slim-trixie + +# Set environment variables +ENV DEBIAN_FRONTEND=noninteractive + +# Install system dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git pulseaudio pulseaudio-utils liquidsoap -y + +# Clone the boatbod op25 repository +RUN git clone -b gr310 https://github.com/boatbod/op25 /op25 + +# Set the working directory +WORKDIR /op25 + +# Run the install script to set up op25 +RUN sed -i 's/sudo //g' install.sh +RUN ./install.sh -f + +# Install Python dependencies +COPY requirements.txt /tmp/requirements.txt +RUN pip3 install --no-cache-dir -r /tmp/requirements.txt + +# Create the run_multi-rx_service.sh script +COPY run_multi-rx_service.sh /op25/op25/gr-op25_repeater/apps/run_multi-rx_service.sh +RUN sed -i 's/\r$//' /op25/op25/gr-op25_repeater/apps/run_multi-rx_service.sh && \ + chmod +x /op25/op25/gr-op25_repeater/apps/run_multi-rx_service.sh + +# Expose ports for HTTP control as needed, for example: +EXPOSE 8001 8081 + +# Create and set up the configuration directory +VOLUME ["/configs"] + +# Set the working directory in the container +WORKDIR /app + +# Copy the rest of the directory contents into the container at /app +COPY ./app /app + +# 1. Copy the wrapper script and make it executable +COPY docker-entrypoint.sh /usr/local/bin/ +RUN sed -i 's/\r$//' /usr/local/bin/docker-entrypoint.sh && \ + chmod +x /usr/local/bin/docker-entrypoint.sh + +# 2. Update ENTRYPOINT to use the wrapper script +ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"] + +# 3. Use CMD to pass the uvicorn command as arguments to the ENTRYPOINT script +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--reload"] \ No newline at end of file diff --git a/app/internal/liquidsoap_config_utils.py b/app/internal/liquidsoap_config_utils.py new file mode 100644 index 0000000..c7ae27e --- /dev/null +++ b/app/internal/liquidsoap_config_utils.py @@ -0,0 +1,34 @@ +import os +from internal.op25_liq_template import liquidsoap_config_template +from models.models import IcecastConfig + +def generate_liquid_script(config: IcecastConfig): + """ + Generates the "*.liq" file that's run by OP25 on startup. + + Placeholders in the template must be formatted as ${VARIABLE_NAME}. + + Args: + config (dict): A dictionary of key-value pairs for substitution. + Keys should match the variable names in the template (e.g., 'icecast_host'). + """ + try: + content = liquidsoap_config_template + # Replace variables + for key, value in config.model_dump().items(): + placeholder = f"${{{key}}}" + # Ensure the value is converted to string for replacement + content = content.replace(placeholder, str(value)) + print(f" - Replaced placeholder {placeholder}") + + # Write the processed content to the output path + output_path = "/configs/op25.liq" + with open(output_path, 'a+') as f: + f.write(content) + + print(f"\nSuccessfully wrote processed configuration to: {output_path}") + + except FileNotFoundError: + print(f"Error: Template file not found at {template_path}") + except Exception as e: + print(f"An unexpected error occurred: {e}") \ No newline at end of file diff --git a/app/internal/logger.py b/app/internal/logger.py new file mode 100644 index 0000000..dd44862 --- /dev/null +++ b/app/internal/logger.py @@ -0,0 +1,55 @@ +import logging +from logging.handlers import RotatingFileHandler + +def create_logger(name, level=logging.DEBUG, max_bytes=10485760, backup_count=2): + """ + Creates a logger with a console and rotating file handlers for both debug and info log levels. + + Args: + name (str): The name for the logger. + level (int): The logging level for the logger. Defaults to logging.DEBUG. + max_bytes (int): Maximum size of the log file in bytes before it gets rotated. Defaults to 10 MB. + backup_count (int): Number of backup files to keep. Defaults to 2. + + Returns: + logging.Logger: Configured logger. + """ + # Set the log file paths + debug_log_file = "./client.debug.log" + info_log_file = "./client.log" + + # Create a logger + logger = logging.getLogger(name) + logger.setLevel(level) + + # Check if the logger already has handlers to avoid duplicate logs + if not logger.hasHandlers(): + # Create console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + + # Create rotating file handler for debug level + debug_file_handler = RotatingFileHandler(debug_log_file, maxBytes=max_bytes, backupCount=backup_count) + debug_file_handler.setLevel(logging.DEBUG) + + # Create rotating file handler for info level + info_file_handler = RotatingFileHandler(info_log_file, maxBytes=max_bytes, backupCount=backup_count) + info_file_handler.setLevel(logging.INFO) + + # Create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + debug_file_handler.setFormatter(formatter) + info_file_handler.setFormatter(formatter) + + # Add the handlers to the logger + logger.addHandler(console_handler) + logger.addHandler(debug_file_handler) + logger.addHandler(info_file_handler) + + return logger + +# Example usage: +# logger = create_logger('my_logger') +# logger.debug('This is a debug message') +# logger.info('This is an info message') diff --git a/app/internal/op25_config_utls.py b/app/internal/op25_config_utls.py new file mode 100644 index 0000000..30d0609 --- /dev/null +++ b/app/internal/op25_config_utls.py @@ -0,0 +1,122 @@ +import csv +import json +import os +import shutil +from models.models import TalkgroupTag +from typing import List, Dict +from internal.logger import create_logger + +LOGGER = create_logger(__name__) + +CONFIG_DIR = "/configs" + +def scan_local_library() -> List[Dict]: + """ + Scans the /configs directory for JSON files to build the 'nearby_systems' list. + """ + library = [] + if not os.path.exists(CONFIG_DIR): + return library + + for filename in os.listdir(CONFIG_DIR): + # We don't want to include the active config or the sidecar files in the library scan + if filename.endswith(".json") and filename != "active.cfg.json": + try: + path = os.path.join(CONFIG_DIR, filename) + with open(path, 'r') as f: + data = json.load(f) + # Use trunking sysname or filename as the identifier + sys_name = data.get("trunking", {}).get("sysname", filename.replace(".json", "")) + library.append({ + "name": sys_name, + "system_name": filename, + "mode": "P25" if "trunking" in data else "NBFM" + }) + except Exception as e: + LOGGER.error(f"Failed to parse library file {filename}: {e}") + + return library + +def activate_config_from_library(system_name: str) -> bool: + """ + Copies a config from the library to the active slot. + """ + if not system_name.endswith(".json"): + system_name += ".json" + + src = os.path.join(CONFIG_DIR, system_name) + dst = os.path.join(CONFIG_DIR, "active.cfg.json") + + if not os.path.exists(src): + LOGGER.error(f"Source config {system_name} not found in library.") + return False + + try: + shutil.copy2(src, dst) + LOGGER.info(f"Activated config: {system_name}") + return True + except Exception as e: + LOGGER.error(f"Failed to copy config: {e}") + return False + +def save_config_to_library(system_name: str, config: Dict) -> bool: + """ + Saves a configuration dictionary to the local library. + """ + if not system_name.endswith(".json"): + system_name += ".json" + + path = os.path.join(CONFIG_DIR, system_name) + + try: + with open(path, 'w') as f: + json.dump(config, f, indent=2) + LOGGER.info(f"Saved config to library: {system_name}") + return True + except Exception as e: + LOGGER.error(f"Failed to save config to library: {e}") + return False + +def get_current_active_config() -> Dict: + """Reads the current active.cfg.json if it exists.""" + path = os.path.join(CONFIG_DIR, "active.cfg.json") + if os.path.exists(path): + try: + with open(path, 'r') as f: + return json.load(f) + except: + return {} + return {} + +def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: + with open(os.path.join(CONFIG_DIR, "active.cfg.tags.tsv"), 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file, delimiter='\t', lineterminator='\n') + for tag in talkgroup_tags: + writer.writerow([tag.tagDec, tag.talkgroup]) + +def save_whitelist(talkgroup_tags: List[int]) -> None: + with open(os.path.join(CONFIG_DIR, "active.cfg.whitelist.tsv"), 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file, delimiter='\t', lineterminator='\n') + for tag in talkgroup_tags: + writer.writerow([tag]) + +def del_none_in_dict(d): + for key, value in list(d.items()): + if value is None: + del d[key] + elif isinstance(value, dict): + del_none_in_dict(value) + elif isinstance(value, list): + for iterative_value in value: + if isinstance(iterative_value, dict): + del_none_in_dict(iterative_value) + return d + +def get_current_system_from_config() -> str: + data = get_current_active_config() + if not data: + return None + try: + return data.get("trunking", {}).get("sysname", "Unknown System") + except: + return "Unknown System" \ No newline at end of file diff --git a/app/internal/op25_liq_template.py b/app/internal/op25_liq_template.py new file mode 100644 index 0000000..da27d63 --- /dev/null +++ b/app/internal/op25_liq_template.py @@ -0,0 +1,48 @@ +liquidsoap_config_template = """#!/usr/bin/liquidsoap + +# Example liquidsoap streaming from op25 to icecast +# (c) 2019-2021 gnorbury@bondcar.com, wllmbecks@gmail.com +# + +set("log.stdout", true) +set("log.file", false) +set("log.level", 1) + +# Make the native sample rate compatible with op25 +set("frame.audio.samplerate", 8000) +set("init.allow_root", true) + +# ========================================================== +ICE_HOST = "${icecast_host}" +ICE_PORT = ${icecast_port} +ICE_MOUNT = "${icecast_mountpoint}" +ICE_PASSWORD = "${icecast_password}" +ICE_DESCRIPTION = "${icecast_description}" +ICE_GENRE = "${icecast_genre}" +# ========================================================== + +input = mksafe(input.external(buffer=0.25, channels=2, samplerate=8000, restart_on_error=false, "./audio.py -x 2.5 -s")) +# Consider increasing the buffer value on slow systems such as RPi3. e.g. buffer=0.25 + +# Compression +input = compress(input, attack = 2.0, gain = 0.0, knee = 13.0, ratio = 2.0, release = 12.3, threshold = -18.0) + +# Normalization +input = normalize(input, gain_max = 6.0, gain_min = -6.0, target = -16.0, threshold = -65.0) + +# ========================================================== +# OUTPUT: Referencing the new variables +# ========================================================== +output.icecast( + %mp3(bitrate=16, samplerate=22050, stereo=false), + description=ICE_DESCRIPTION, + genre=ICE_GENRE, + url="", + fallible=false, + host=ICE_HOST, + port=ICE_PORT, + mount=ICE_MOUNT, + password=ICE_PASSWORD, + mean(input) +) +""" \ No newline at end of file diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..da42374 --- /dev/null +++ b/app/main.py @@ -0,0 +1,184 @@ +import asyncio +import json +import os +from datetime import datetime +from fastapi import FastAPI +from routers.op25_controller import create_op25_router +from internal.logger import create_logger +from internal.op25_config_utls import scan_local_library +import paho.mqtt.client as mqtt +import requests + +# Initialize logging +LOGGER = create_logger(__name__) + +app = FastAPI(title="Radio Edge Supervisor") + +# Add the router +app.include_router(create_op25_router(), prefix="/op25") + +NODE_ID = os.getenv("NODE_ID", "standalone-node") +MQTT_BROKER = os.getenv("MQTT_BROKER", None) + +def handle_c2_command(topic, payload): + """ + Parses and routes commands received from the C2 server by calling the + local supervisor's API. + """ + try: + data = json.loads(payload) + command_type = data.get("command") + + LOGGER.info(f"Received C2 Command: {command_type} on {topic}") + + # Base URL for the local supervisor API + base_url = "http://localhost:8001/op25" + + if command_type == "start": + response = requests.post(f"{base_url}/start") + response.raise_for_status() + LOGGER.info("Successfully executed 'start' command via API.") + + elif command_type == "stop": + response = requests.post(f"{base_url}/stop") + response.raise_for_status() + LOGGER.info("Successfully executed 'stop' command via API.") + + elif command_type == "restart": + LOGGER.info("Executing 'restart' command...") + stop_response = requests.post(f"{base_url}/stop") + stop_response.raise_for_status() + time.sleep(2) # Give it a moment for services to die + start_response = requests.post(f"{base_url}/start") + start_response.raise_for_status() + LOGGER.info("Successfully executed 'restart' command via API.") + + elif command_type in ["update", "set_active_config"]: + config_payload = data.get("config") + if not config_payload: + LOGGER.error(f"Command '{command_type}' missing 'config' payload.") + return + + elif command_type == "update": + LOGGER.info("Updating local configuration...") + # Placeholder: update_local_config(data.get("config")) + restart = data.get("restart", True) + response = requests.post(f"{base_url}/set_active_config?restart={restart}", json=config_payload) + response.raise_for_status() + LOGGER.info(f"Successfully executed '{command_type}' command via API.") + + elif command_type == "load_from_library": + system_name = data.get("system_name") + if not system_name: + LOGGER.error("Command 'load_from_library' missing 'system_name' payload.") + return + + response = requests.post(f"{base_url}/load_from_library?system_name={system_name}") + response.raise_for_status() + LOGGER.info(f"Successfully executed 'load_from_library' for {system_name} via API.") + + elif command_type == "tune": + freq_mhz = data.get("system") + if not freq_mhz: + LOGGER.error("Command 'tune' missing 'frequency' payload.") + return + + try: + # OP25 terminal expects frequency in Hz + freq_hz = int(float(freq_mhz) * 1_000_000) + # The port is hardcoded as it's the default for the OP25 terminal + op25_terminal_url = f"http://localhost:8081/tuning?chan=0&freq={freq_hz}" + response = requests.get(op25_terminal_url, timeout=5) + response.raise_for_status() + LOGGER.info(f"Successfully sent tune command to OP25 terminal for {freq_mhz} MHz.") + except ValueError: + LOGGER.error(f"Invalid frequency format for tune command: {freq_mhz}") + except requests.exceptions.RequestException as e: + LOGGER.error(f"Failed to connect to OP25 terminal for tuning: {e}") + + else: + LOGGER.warning(f"Unknown command type received: {command_type}") + + except json.JSONDecodeError: + LOGGER.error(f"Failed to decode command payload: {payload}") + except requests.exceptions.RequestException as e: + LOGGER.error(f"Failed to call local API for command '{data.get('command')}': {e}") + except Exception as e: + LOGGER.error(f"Error processing C2 command: {e}") + +async def mqtt_phone_home(): + """ + Maintains a persistent C2 connection using a single MQTT client. + Handles check-ins and command subscriptions via callbacks. + """ + if not MQTT_BROKER: + LOGGER.info("No MQTT_BROKER defined. Running in standalone mode.") + return + + # Create a single client instance + client = mqtt.Client(client_id=NODE_ID) + + def on_connect(client, userdata, flags, rc): + if rc == 0: + LOGGER.info(f"Successfully connected to MQTT Broker: {MQTT_BROKER}") + + # 1. Subscribe to command topics for this specific node + command_topic = f"nodes/{NODE_ID}/commands" + client.subscribe(command_topic) + LOGGER.info(f"Subscribed to {command_topic}") + + # 2. Perform Initial Check-In with OP25 status + try: + status_response = requests.get("http://localhost:8001/op25/status") + op25_status = status_response.json() if status_response.ok else {} + except requests.RequestException: + op25_status = {"is_running": False, "active_system": None} + + checkin_data = { + "node_id": NODE_ID, + "status": "online", + "timestamp": datetime.now().isoformat(), + "version": "1.0.0", + "is_listening": op25_status.get("is_running", False), + "active_system": op25_status.get("active_system"), + "available_systems": scan_local_library() + } + client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(checkin_data), retain=True) + else: + LOGGER.error(f"MQTT Connection failed with return code {rc}") + + def on_message(client, userdata, msg): + # Handle messages arriving on subscribed topics + handle_c2_command(msg.topic, msg.payload.decode()) + + def on_disconnect(client, userdata, rc): + if rc != 0: + LOGGER.warning("Unexpected MQTT disconnection. Paho will attempt to reconnect...") + + # Set up callbacks and LWT + client.on_connect = on_connect + client.on_message = on_message + client.on_disconnect = on_disconnect + + lwt_payload = json.dumps({"node_id": NODE_ID, "status": "offline"}) + client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True) + + try: + # Connect and start the background loop thread + # loop_start() handles reconnections automatically without spaming new clients + client.connect(MQTT_BROKER, 1883, 60) + client.loop_start() + + # Keep the async task alive indefinitely + while True: + await asyncio.sleep(3600) + + except Exception as e: + LOGGER.error(f"Fatal error in MQTT supervisor: {e}") + finally: + client.loop_stop() + +@app.on_event("startup") +async def startup_event(): + # Start the C2 connection in the background. + asyncio.create_task(mqtt_phone_home()) \ No newline at end of file diff --git a/app/models/models.py b/app/models/models.py new file mode 100644 index 0000000..1f02958 --- /dev/null +++ b/app/models/models.py @@ -0,0 +1,111 @@ +from pydantic import BaseModel +from typing import List, Optional, Union +from enum import Enum + +class DecodeMode(str, Enum): + P25 = "P25" + DMR = "DMR" + ANALOG = "NBFM" + +class TalkgroupTag(BaseModel): + talkgroup: str + tagDec: int + +class ConfigGenerator(BaseModel): + type: DecodeMode + systemName: str + channels: List[Union[int, str]] + tags: Optional[List[TalkgroupTag]] + whitelist: Optional[List[int]] + icecastConfig: Optional[IcecastConfig] + +class DemodType(str, Enum): + CQPSK = "cqpsk" + FSK4 = "fsk4" + +class FilterType(str, Enum): + RC = "rc" + WIDEPULSE = "widepulse" + +class ChannelConfig(BaseModel): + name: str + trunking_sysname: Optional[str] + enable_analog: str + meta_stream_name: str + demod_type: DemodType + filter_type: FilterType + device: Optional[str] = "sdr" + cqpsk_tracking: Optional[bool] = None + frequency: Optional[float] = None + nbfmSquelch: Optional[float] = None + destination: Optional[str] = "udp://127.0.0.1:23456" + tracking_threshold: Optional[int] = 120 + tracking_feedback: Optional[float] = 0.75 + excess_bw: Optional[float] = 0.2 + if_rate: Optional[int] = 24000 + plot: Optional[str] = "" + symbol_rate: Optional[int] = 4800 + blacklist: Optional[str] = "" + whitelist: Optional[str] = "" + +class DeviceConfig(BaseModel): + args: Optional[str] = "rtl" + gains: Optional[str] = "lna:39" + gain_mode: Optional[bool] = False + name: Optional[str] = "sdr" + offset: Optional[int] = 0 + ppm: Optional[float] = 0.0 + rate: Optional[int] = 1920000 + usable_bw_pct: Optional[float] = 0.85 + tunable: Optional[bool] = True + +class TrunkingChannelConfig(BaseModel): + sysname: str + control_channel_list: str + tagsFile: Optional[str] = None + whitelist: Optional[str] = None + nac: Optional[str] = "" + wacn: Optional[str] = "" + tdma_cc: Optional[bool] = False + crypt_behavior: Optional[int] = 2 + +class TrunkingConfig(BaseModel): + module: str + chans: List[TrunkingChannelConfig] + +class MetadataStreamConfig(BaseModel): + stream_name: str = "stream_0" + meta_format_idle: str = "[idle]" + meta_format_tgid: str = "[%TGID%]" + meta_format_tag: str = "[%TGID%] %TAG%" + icecastServerAddress: str = "ic2.vpn.cusano.net" + icecastMountpoint: str = "NODE_ID" + icecastMountExt: str = ".xspf" + icecastPass: str = "PASSWORD" + delay: float = 0.0 + +class MetadataConfig(BaseModel): + module: str = "icemeta.py" + streams: List[MetadataStreamConfig] + +class TerminalConfig(BaseModel): + module: Optional[str] = "terminal.py" + terminal_type: Optional[str] = "http:0.0.0.0:8081" + terminal_timeout: Optional[float] = 5.0 + curses_plot_interval: Optional[float] = 0.2 + http_plot_interval: Optional[float] = 1.0 + http_plot_directory: Optional[str] = "../www/images" + tuning_step_large: Optional[int] = 1200 + tuning_step_small: Optional[int] = 100 + + + +### ====================================================== +# Icecast models +class IcecastConfig(BaseModel): + icecast_host: str + icecast_port: int + icecast_mountpoint: str + icecast_password: str + icecast_description: Optional[str] = "OP25" + icecast_genre: Optional[str] = "Public Safety" \ No newline at end of file diff --git a/app/routers/op25_controller.py b/app/routers/op25_controller.py new file mode 100644 index 0000000..5accdbf --- /dev/null +++ b/app/routers/op25_controller.py @@ -0,0 +1,180 @@ +from fastapi import HTTPException, APIRouter +import subprocess +import os +import signal +import json +import asyncio +from internal.logger import create_logger +from models.models import ( + ConfigGenerator, DecodeMode, ChannelConfig, DeviceConfig, + TrunkingConfig, TrunkingChannelConfig, TerminalConfig, + MetadataConfig, MetadataStreamConfig +) +from internal.op25_config_utls import ( + save_talkgroup_tags, save_whitelist, del_none_in_dict, + get_current_system_from_config, activate_config_from_library, + save_config_to_library, scan_local_library +) +from internal.liquidsoap_config_utils import generate_liquid_script + +LOGGER = create_logger(__name__) + +# Global process tracker +op25_process = None +OP25_PATH = "/op25/op25/gr-op25_repeater/apps/" +OP25_SCRIPT = "run_multi-rx_service.sh" + +async def stop_op25_logic(): + """ + Kills the OP25 process group to ensure sub-processes like Liquidsoap + are also terminated. + """ + global op25_process + if op25_process and op25_process.poll() is None: + try: + # Kill the entire process group + os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM) + op25_process = None + LOGGER.info("OP25 Process group stopped successfully") + return True + except Exception as e: + LOGGER.error(f"Error stopping OP25 process group: {e}") + return False + return False + +async def start_op25_logic(): + """ + Starts the OP25 shell script as a new process group. + """ + global op25_process + if op25_process is None or op25_process.poll() is not None: + try: + op25_process = subprocess.Popen( + ["/bin/bash", os.path.join(OP25_PATH, OP25_SCRIPT)], + preexec_fn=os.setsid, # Create a new process group + cwd=OP25_PATH + ) + LOGGER.info(f"OP25 started with PID: {op25_process.pid}") + return True + except Exception as e: + LOGGER.error(f"Failed to start OP25: {e}") + return False + return False + +def create_op25_router(): + router = APIRouter() + + @router.post("/start") + async def start_op25(): + if await start_op25_logic(): + return {"status": "OP25 started"} + raise HTTPException(status_code=500, detail="Failed to start OP25 (Check logs for hardware/config errors)") + + @router.post("/stop") + async def stop_op25(): + if await stop_op25_logic(): + return {"status": "OP25 stopped"} + return {"status": "OP25 was not running"} + + @router.get("/status") + async def get_status(): + is_running = op25_process is not None and op25_process.poll() is None + return { + "node_id": os.getenv("NODE_ID", "standalone-node"), + "is_running": is_running, + "pid": op25_process.pid if is_running else None, + "active_system": get_current_system_from_config() if is_running else None + } + + @router.post("/set_active_config") + async def set_active_config(generator: ConfigGenerator, restart: bool = True, save_to_library_name: str = None): + """ + Takes a complex config model, generates the JSON, saves it to + active.cfg.json, and optionally restarts the radio. + """ + try: + if generator.type == DecodeMode.P25: + # 1. Handle sidecar files (Tags/Whitelists) + if generator.config.talkgroupTags: + save_talkgroup_tags(generator.config.talkgroupTags) + if generator.config.whitelist: + save_whitelist(generator.config.whitelist) + + # 2. Build the main OP25 dictionary structure + config_dict = { + "channels": [c.dict() for c in generator.config.channels], + "devices": [d.dict() for d in generator.config.devices], + "trunking": generator.config.trunking.dict(), + "metadata": generator.config.metadata.dict(), + "terminal": generator.config.terminal.dict() + } + + elif generator.type == DecodeMode.ANALOG: + # Simple Analog NBFM Setup for quick testing + channels = [ChannelConfig( + channelName=generator.config.systemName, + enableAnalog="on", + frequency=generator.config.frequency, + demodType="fsk4", + filterType="widepulse" + )] + config_dict = { + "channels": [c.dict() for c in channels], + "devices": [{"gain": "LNA:32"}] # Default gain for analog test + } + else: + raise HTTPException(status_code=400, detail="Invalid decode mode") + + # 3. Clean 'None' values to prevent OP25 parsing errors and save + final_json = del_none_in_dict(config_dict) + + if save_to_library_name: + save_config_to_library(save_to_library_name, final_json) + + with open('/configs/active.cfg.json', 'w') as f: + json.dump(final_json, f, indent=2) + + LOGGER.info("Saved new configuration to active.cfg.json") + + # 4. Handle Lifecycle + if restart: + LOGGER.info("Restarting OP25 to apply new config...") + await stop_op25_logic() + await asyncio.sleep(1.5) # Allow sockets to clear + await start_op25_logic() + + return {"message": "Active configuration updated", "radio_restarted": restart} + + except Exception as e: + LOGGER.error(f"Config export failed: {e}") + raise HTTPException(status_code=500, detail=f"Configuration error: {str(e)}") + + @router.post("/load_from_library") + async def load_from_library(system_name: str): + """ + Swaps the active config with a pre-existing file in the /configs library. + """ + if activate_config_from_library(system_name): + await stop_op25_logic() + await asyncio.sleep(1.5) + await start_op25_logic() + return {"status": f"Loaded and started library config: {system_name}"} + raise HTTPException(status_code=404, detail=f"Config '{system_name}' not found in library volume") + + @router.post("/save_to_library") + async def save_to_library(system_name: str, config: dict): + """ + Directly saves a JSON configuration to the library. + """ + if save_config_to_library(system_name, config): + return {"status": f"Config saved as {system_name}"} + raise HTTPException(status_code=500, detail="Failed to save configuration") + + @router.get("/library") + async def get_library(): + """ + Returns a list of all saved configurations in the library. + """ + return scan_local_library() + + return router \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a619db2 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,26 @@ +services: + edge-supervisor: + build: + context: . + dockerfile: Dockerfile + container_name: radio-edge-node + restart: unless-stopped + ports: + - 8001:8001 + devices: + - "/dev/bus/usb:/dev/bus/usb" + volumes: + - ./config:/app/config + - ./op25_logs:/tmp/op25 + env_file: + - .env + environment: + - NODE_ID=${NODE_ID} + - MQTT_BROKER=${MQTT_BROKER} + - ICECAST_SERVER=${ICECAST_SERVER} + networks: + - radio-shared-net + +networks: + radio-shared-net: + external: true \ No newline at end of file diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh new file mode 100644 index 0000000..1d32b07 --- /dev/null +++ b/docker-entrypoint.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +# --- Start PulseAudio Daemon --- +# The -D flag starts it as a daemon. +# The --exit-idle-time=-1 prevents it from automatically shutting down. +echo "Starting PulseAudio daemon..." +pulseaudio -D --exit-idle-time=-1 --system + +# Wait a moment for PulseAudio to initialize +sleep 1 + +# --- Execute the main command (uvicorn) --- +echo "Starting FastAPI application..." +# The main application arguments are passed directly to this script +exec "$@" \ No newline at end of file diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..2c4901e --- /dev/null +++ b/readme.md @@ -0,0 +1,33 @@ +radio-edge-node + +This repository contains the containerized supervisor and OP25 instance for the SDR nodes. It manages the local RF workload and synchronizes state with the central C2 via MQTT. + +1. Directory Structure + +radio-edge-node/ +├── app/ +│ ├── main.py # Entry point & MQTT Startup +│ ├── routers/ # FastAPI Routes (op25_controller.py) +│ ├── internal/ # Business Logic (config_utils, logger, etc.) +│ └── models/ # Pydantic Schemas +├── configs/ # Persistent Volume for active.cfg.json +├── Dockerfile +├── docker-entrypoint.sh +└── docker-compose.yml + + +2. Supervisor Logic + +The supervisor performs three main tasks: + +MQTT Phone-Home: On startup, it sends the Check-In packet to the C2. + +Process Management: It wraps the OP25 multi_rx process. If the process dies, the supervisor can report the failure via MQTT. + +Metadata Tailing: (Planned) Tailing the OP25 stderr/stdout to extract real-time talkgroup grants for the C2 metadata bus. + +3. Deployment + +Set your .env variables (NODE_ID, MQTT_BROKER, etc.) + +docker-compose up --build -d \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e49e782 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +fastapi +uvicorn[standard] +paho-mqtt +pydantic +python-multipart +requests \ No newline at end of file diff --git a/run_multi-rx_service.sh b/run_multi-rx_service.sh new file mode 100644 index 0000000..a2202b7 --- /dev/null +++ b/run_multi-rx_service.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +# Configuration file path +CONFIG_FILE="/configs/active.cfg.json" + +# --- Start the main OP25 receiver (multi_rx.py) in the background --- +# The '&' sends the process to the background. +echo "Starting multi_rx.py..." +./multi_rx.py -v 1 -c $CONFIG_FILE & +MULTI_RX_PID=$! # Store the PID of the background process + +# --- Start the liquid-dsp plot utility (op25.liq) in the background --- +echo "Starting op25.liq..." +liquidsoap /configs/op25.liq & +LIQ_PID=$! # Store the PID of the op25.liq process + +# Wait for both background jobs to finish. +# Since multi_rx.py is the core service, this script will effectively wait +# until multi_rx.py is externally stopped (via the API). +# The trap command ensures that SIGTERM is passed to the background jobs. +trap "kill $MULTI_RX_PID $LIQ_PID" SIGTERM SIGINT +wait $MULTI_RX_PID \ No newline at end of file