From 1a9c92b6db8630835dedd56ac72212a57a66e531 Mon Sep 17 00:00:00 2001 From: Logan Date: Sun, 5 Apr 2026 19:01:51 -0400 Subject: [PATCH] =?UTF-8?q?Initial=20commit=20=E2=80=94=20DRB=20client=20(?= =?UTF-8?q?edge=20node)=20stack?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Includes edge-node (FastAPI/MQTT/Discord voice), op25-container (SDR decoder), and icecast (audio streaming). --- .env.example | 26 +++ .gitignore | 23 ++ Makefile | 18 ++ README.md | 71 +++++++ docker-compose.yml | 31 +++ drb-edge-node/Dockerfile | 17 ++ drb-edge-node/app/__init__.py | 0 drb-edge-node/app/config.py | 39 ++++ drb-edge-node/app/internal/__init__.py | 0 drb-edge-node/app/internal/call_recorder.py | 117 +++++++++++ drb-edge-node/app/internal/config_manager.py | 43 ++++ drb-edge-node/app/internal/credentials.py | 39 ++++ drb-edge-node/app/internal/discord_radio.py | 119 +++++++++++ drb-edge-node/app/internal/logger.py | 10 + .../app/internal/metadata_watcher.py | 127 ++++++++++++ drb-edge-node/app/internal/mqtt_manager.py | 159 ++++++++++++++ drb-edge-node/app/internal/op25_client.py | 63 ++++++ drb-edge-node/app/main.py | 141 +++++++++++++ drb-edge-node/app/models.py | 51 +++++ drb-edge-node/app/routers/__init__.py | 0 drb-edge-node/app/routers/api.py | 82 ++++++++ drb-edge-node/app/routers/ui.py | 12 ++ drb-edge-node/app/templates/index.html | 196 ++++++++++++++++++ drb-edge-node/pytest.ini | 3 + drb-edge-node/requirements.txt | 9 + drb-edge-node/tests/__init__.py | 0 drb-edge-node/tests/conftest.py | 5 + drb-edge-node/tests/test_config_manager.py | 131 ++++++++++++ drb-edge-node/tests/test_metadata_watcher.py | 190 +++++++++++++++++ icecast/Dockerfile | 18 ++ icecast/entrypoint.sh | 11 + icecast/icecast.xml.template | 49 +++++ .../.gitea/workflows/build-nightly.yml | 57 +++++ .../.gitea/workflows/build-stable.yml | 60 ++++++ op25-container/.gitea/workflows/lint.yml | 30 +++ op25-container/.gitignore | 6 + op25-container/Dockerfile | 51 +++++ .../app/internal/liquidsoap_config_utils.py | 34 +++ op25-container/app/internal/logger.py | 55 +++++ .../app/internal/op25_config_utls.py | 70 +++++++ .../app/internal/op25_liq_template.py | 44 ++++ op25-container/app/main.py | 11 + op25-container/app/models.py | 111 ++++++++++ op25-container/app/routers/op25_controller.py | 128 ++++++++++++ op25-container/docker-entrypoint.sh | 15 ++ op25-container/requirements.txt | 2 + op25-container/run_multi-rx_service.sh | 22 ++ 47 files changed, 2496 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 README.md create mode 100644 docker-compose.yml create mode 100644 drb-edge-node/Dockerfile create mode 100644 drb-edge-node/app/__init__.py create mode 100644 drb-edge-node/app/config.py create mode 100644 drb-edge-node/app/internal/__init__.py create mode 100644 drb-edge-node/app/internal/call_recorder.py create mode 100644 drb-edge-node/app/internal/config_manager.py create mode 100644 drb-edge-node/app/internal/credentials.py create mode 100644 drb-edge-node/app/internal/discord_radio.py create mode 100644 drb-edge-node/app/internal/logger.py create mode 100644 drb-edge-node/app/internal/metadata_watcher.py create mode 100644 drb-edge-node/app/internal/mqtt_manager.py create mode 100644 drb-edge-node/app/internal/op25_client.py create mode 100644 drb-edge-node/app/main.py create mode 100644 drb-edge-node/app/models.py create mode 100644 drb-edge-node/app/routers/__init__.py create mode 100644 drb-edge-node/app/routers/api.py create mode 100644 drb-edge-node/app/routers/ui.py create mode 100644 drb-edge-node/app/templates/index.html create mode 100644 drb-edge-node/pytest.ini create mode 100644 drb-edge-node/requirements.txt create mode 100644 drb-edge-node/tests/__init__.py create mode 100644 drb-edge-node/tests/conftest.py create mode 100644 drb-edge-node/tests/test_config_manager.py create mode 100644 drb-edge-node/tests/test_metadata_watcher.py create mode 100644 icecast/Dockerfile create mode 100644 icecast/entrypoint.sh create mode 100644 icecast/icecast.xml.template create mode 100644 op25-container/.gitea/workflows/build-nightly.yml create mode 100644 op25-container/.gitea/workflows/build-stable.yml create mode 100644 op25-container/.gitea/workflows/lint.yml create mode 100644 op25-container/.gitignore create mode 100644 op25-container/Dockerfile create mode 100644 op25-container/app/internal/liquidsoap_config_utils.py create mode 100644 op25-container/app/internal/logger.py create mode 100644 op25-container/app/internal/op25_config_utls.py create mode 100644 op25-container/app/internal/op25_liq_template.py create mode 100644 op25-container/app/main.py create mode 100644 op25-container/app/models.py create mode 100644 op25-container/app/routers/op25_controller.py create mode 100644 op25-container/docker-entrypoint.sh create mode 100644 op25-container/requirements.txt create mode 100644 op25-container/run_multi-rx_service.sh diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..18087e3 --- /dev/null +++ b/.env.example @@ -0,0 +1,26 @@ +# Node Identity +NODE_ID=node-001 +NODE_NAME="My Radio Node" +NODE_LAT=0.0 +NODE_LON=0.0 + +# MQTT — point to your C2 server +MQTT_BROKER=localhost +MQTT_PORT=1883 +MQTT_USER= +MQTT_PASS= + +# C2 server for audio upload (leave blank to disable upload) +C2_URL=http://localhost:8888 +# API key is provisioned automatically via MQTT after admin approves the node + +# Icecast (local container — usually no need to change) +ICECAST_SOURCE_PASSWORD=hackme +ICECAST_ADMIN_PASSWORD=admin +ICECAST_HOST=localhost +ICECAST_PORT=8000 +ICECAST_MOUNT=/radio + +# OP25 container (usually no need to change) +OP25_API_URL=http://localhost:8001 +OP25_TERMINAL_URL=http://localhost:8081 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6ba38c8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Environment / secrets +.env +drb-edge-node/.env + +# Python +__pycache__/ +*.py[cod] +*.pyo +.venv/ +venv/ +.pytest_cache/ + +# Runtime-generated configs and recordings +configs/ +recordings/ + +# Logs +*.log +logs/ + +# OS +.DS_Store +Thumbs.db diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..07469de --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +.PHONY: setup test up down logs + +setup: + @[ -f .env ] && echo ".env already exists, skipping." || (cp .env.example .env && echo "Created .env — fill in your values before running 'make up'.") + +# Run pytest inside the running edge-node container. +# Requires: docker compose up (or at least the edge-node image built). +test: + docker compose run --no-deps --rm edge-node pytest -v + +up: + docker compose up -d + +down: + docker compose down + +logs: + docker compose logs -f edge-node diff --git a/README.md b/README.md new file mode 100644 index 0000000..6269e16 --- /dev/null +++ b/README.md @@ -0,0 +1,71 @@ +# DRB Client (Edge Node) + +The client-side stack for the Discord Radio Bot system. Runs on the SDR hardware machine. Decodes radio with OP25, streams audio via Icecast, and connects to the C2 server over MQTT. + +## Services + +| Service | Description | +|---|---| +| `op25` | OP25 SDR decoder — tunes to radio systems and streams decoded audio | +| `icecast` | Audio streaming server — receives audio from OP25 and serves it over HTTP | +| `edge-node` | FastAPI node agent — bridges OP25 metadata, MQTT C2 commands, and Discord voice | + +All services run with `network_mode: host` so they share the host network and can reach each other on localhost. + +## Prerequisites + +- Docker + Docker Compose +- An RTL-SDR or compatible SDR dongle connected to the host +- Network access to the DRB server (C2 server) + +## Setup + +```bash +# 1. Copy env file +cp .env.example .env + +# 2. Fill in .env +# At minimum: NODE_ID, MQTT_BROKER (IP of the C2 server) + +# 3. Build and start +docker compose build +docker compose up -d +``` + +## Environment Variables (`.env`) + +| Variable | Description | Default | +|---|---|---| +| `NODE_ID` | Unique identifier for this node | required | +| `NODE_NAME` | Human-readable name shown in the UI | `My Radio Node` | +| `NODE_LAT` / `NODE_LON` | GPS coordinates for the map view | `0.0` | +| `MQTT_BROKER` | IP or hostname of the C2 server | `localhost` | +| `MQTT_PORT` | MQTT broker port | `1883` | +| `C2_URL` | C2 server HTTP API URL (for audio uploads) | — | +| `ICECAST_HOST` | Icecast hostname (leave as localhost) | `localhost` | +| `ICECAST_PORT` | Icecast port | `8000` | +| `ICECAST_MOUNT` | Icecast mount point | `/radio` | + +## Node Provisioning Flow + +1. Start the client stack — the edge node connects to MQTT and sends a **checkin** +2. The C2 server registers the node as **pending approval** +3. An admin approves the node in the web UI +4. The admin assigns a **radio system** to the node via the UI +5. The C2 server pushes the system config over MQTT +6. The edge node writes the config and **starts OP25** +7. OP25 begins decoding and streaming audio to Icecast + +After a restart, OP25 resumes automatically if the node was already configured. + +## Discord Voice + +When a user runs `/join` in Discord (or clicks "Join Discord" in the UI), the C2 server sends a `discord_join` command to this node over MQTT. The edge node spins up a Discord bot using a token from the server's token pool and connects it to the requested voice channel, streaming live audio from Icecast. + +## Logs + +```bash +docker compose logs -f edge-node +docker compose logs -f op25 +docker compose logs -f icecast +``` diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..6fa78ff --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,31 @@ +services: + icecast: + build: ./icecast + restart: unless-stopped + network_mode: host + environment: + ICECAST_SOURCE_PASSWORD: ${ICECAST_SOURCE_PASSWORD:-hackme} + ICECAST_ADMIN_PASSWORD: ${ICECAST_ADMIN_PASSWORD:-admin} + + op25: + build: ./op25-container + restart: unless-stopped + privileged: true + network_mode: host + volumes: + - ./configs:/configs + - /dev:/dev + depends_on: + - icecast + + edge-node: + build: ./drb-edge-node + restart: unless-stopped + network_mode: host + env_file: .env + volumes: + - ./configs:/configs + - ./recordings:/recordings + depends_on: + - icecast + - op25 diff --git a/drb-edge-node/Dockerfile b/drb-edge-node/Dockerfile new file mode 100644 index 0000000..2b95eab --- /dev/null +++ b/drb-edge-node/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.11-slim + +RUN apt-get update && apt-get install -y \ + ffmpeg \ + libopus0 \ + libopus-dev \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app/ ./app/ +COPY tests/ ./tests/ + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/drb-edge-node/app/__init__.py b/drb-edge-node/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/drb-edge-node/app/config.py b/drb-edge-node/app/config.py new file mode 100644 index 0000000..8d28973 --- /dev/null +++ b/drb-edge-node/app/config.py @@ -0,0 +1,39 @@ +from pydantic_settings import BaseSettings +from typing import Optional + + +class Settings(BaseSettings): + # Node identity + node_id: str + node_name: str = "Unnamed Node" + node_lat: float = 0.0 + node_lon: float = 0.0 + + # MQTT + mqtt_broker: str + mqtt_port: int = 1883 + mqtt_user: Optional[str] = None + mqtt_pass: Optional[str] = None + + # C2 server (audio upload destination); None disables upload + c2_url: Optional[str] = None + + # Local Icecast + icecast_host: str = "localhost" + icecast_port: int = 8000 + icecast_mount: str = "/radio" + icecast_source_password: str = "hackme" + + # OP25 container + op25_api_url: str = "http://localhost:8001" + op25_terminal_url: str = "http://localhost:8081" + + # Paths (volume mounts) + config_path: str = "/configs" + recordings_path: str = "/recordings" + + class Config: + env_file = ".env" + + +settings = Settings() diff --git a/drb-edge-node/app/internal/__init__.py b/drb-edge-node/app/internal/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/drb-edge-node/app/internal/call_recorder.py b/drb-edge-node/app/internal/call_recorder.py new file mode 100644 index 0000000..9c55bb6 --- /dev/null +++ b/drb-edge-node/app/internal/call_recorder.py @@ -0,0 +1,117 @@ +import asyncio +from pathlib import Path +from datetime import datetime, timezone +from typing import Optional +import httpx +from app.config import settings +from app.internal import credentials +from app.internal.logger import logger + +MAX_RECORDING_SECONDS = 600 # 10 min safety cap; FFmpeg terminates long-running calls + + +class CallRecorder: + def __init__(self): + self._process: Optional[asyncio.subprocess.Process] = None + self._current_call_id: Optional[str] = None + self._current_file: Optional[Path] = None + self._icecast_url = ( + f"http://{settings.icecast_host}:{settings.icecast_port}{settings.icecast_mount}" + ) + self._recordings_dir = Path(settings.recordings_path) + + async def start_recording(self, call_id: str) -> bool: + if self._process: + logger.warning("Recording already running — ignoring start.") + return False + + self._recordings_dir.mkdir(parents=True, exist_ok=True) + ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + self._current_file = self._recordings_dir / f"{ts}_{call_id}.mp3" + self._current_call_id = call_id + + cmd = [ + "ffmpeg", "-y", + "-i", self._icecast_url, + "-acodec", "copy", + "-t", str(MAX_RECORDING_SECONDS), + str(self._current_file), + ] + + try: + self._process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + logger.info(f"Recording started: {self._current_file.name}") + return True + except Exception as e: + logger.error(f"FFmpeg start failed: {e}") + self._process = None + self._current_file = None + self._current_call_id = None + return False + + async def stop_recording(self) -> Optional[Path]: + if not self._process: + return None + + proc = self._process + output_file = self._current_file + self._process = None + self._current_file = None + self._current_call_id = None + + try: + proc.terminate() + await asyncio.wait_for(proc.wait(), timeout=5) + except asyncio.TimeoutError: + proc.kill() + except ProcessLookupError: + pass + + if output_file and output_file.exists() and output_file.stat().st_size > 0: + logger.info(f"Recording saved: {output_file.name} ({output_file.stat().st_size} bytes)") + return output_file + + logger.warning("Recording file empty or missing — discarding.") + return None + + async def upload_recording(self, file_path: Path, call_id: str) -> Optional[str]: + if not settings.c2_url: + logger.info("No C2_URL configured — skipping upload.") + return None + + upload_url = f"{settings.c2_url}/upload" + api_key = credentials.get_api_key() + headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} + + try: + async with httpx.AsyncClient(timeout=120) as client: + with open(file_path, "rb") as f: + r = await client.post( + upload_url, + files={"file": (file_path.name, f, "audio/mpeg")}, + data={"call_id": call_id, "node_id": settings.node_id}, + headers=headers, + ) + r.raise_for_status() + audio_url = r.json().get("url") + logger.info(f"Upload complete: {audio_url}") + return audio_url + except Exception as e: + logger.error(f"Upload failed: {e}") + return None + finally: + try: + file_path.unlink() + except Exception: + pass + + @property + def is_recording(self) -> bool: + return self._process is not None + + +call_recorder = CallRecorder() diff --git a/drb-edge-node/app/internal/config_manager.py b/drb-edge-node/app/internal/config_manager.py new file mode 100644 index 0000000..83cdeab --- /dev/null +++ b/drb-edge-node/app/internal/config_manager.py @@ -0,0 +1,43 @@ +import json +from pathlib import Path +from typing import Optional +from app.config import settings +from app.models import NodeConfig, SystemConfig +from app.internal.logger import logger + +_CONFIG_FILE = Path(settings.config_path) / "node_config.json" + + +def load_node_config() -> NodeConfig: + if _CONFIG_FILE.exists(): + try: + data = json.loads(_CONFIG_FILE.read_text()) + return NodeConfig(**data) + except Exception as e: + logger.warning(f"Could not load node config, using defaults: {e}") + + return NodeConfig( + node_id=settings.node_id, + node_name=settings.node_name, + lat=settings.node_lat, + lon=settings.node_lon, + configured=False, + ) + + +def save_node_config(config: NodeConfig) -> None: + _CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True) + _CONFIG_FILE.write_text(config.model_dump_json(indent=2)) + logger.info("Node config saved.") + + +def apply_system_config(system_config: SystemConfig) -> bool: + """Write the OP25-compatible config blob to /configs/active.cfg.json.""" + try: + active_cfg = Path(settings.config_path) / "active.cfg.json" + active_cfg.write_text(json.dumps(system_config.config, indent=2)) + logger.info(f"System config applied: {system_config.name}") + return True + except Exception as e: + logger.error(f"Failed to write system config: {e}") + return False diff --git a/drb-edge-node/app/internal/credentials.py b/drb-edge-node/app/internal/credentials.py new file mode 100644 index 0000000..e93b374 --- /dev/null +++ b/drb-edge-node/app/internal/credentials.py @@ -0,0 +1,39 @@ +""" +Manages the persisted node API key. + +The key is provisioned by the C2 server after an admin approves the node. +It arrives via MQTT and is saved to /configs/credentials.json so it survives +container restarts. +""" +import json +from pathlib import Path +from app.config import settings +from app.internal.logger import logger + +_CREDS_FILE = Path(settings.config_path) / "credentials.json" +_api_key: str | None = None + + +def load() -> None: + """Load persisted credentials from disk on startup.""" + global _api_key + if _CREDS_FILE.exists(): + try: + data = json.loads(_CREDS_FILE.read_text()) + _api_key = data.get("api_key") + if _api_key: + logger.info("Node credentials loaded from disk.") + except Exception as e: + logger.warning(f"Could not read credentials file: {e}") + + +def get_api_key() -> str | None: + return _api_key + + +def save_api_key(key: str) -> None: + global _api_key + _api_key = key + _CREDS_FILE.parent.mkdir(parents=True, exist_ok=True) + _CREDS_FILE.write_text(json.dumps({"api_key": key})) + logger.info("Node API key saved to disk.") diff --git a/drb-edge-node/app/internal/discord_radio.py b/drb-edge-node/app/internal/discord_radio.py new file mode 100644 index 0000000..7522a26 --- /dev/null +++ b/drb-edge-node/app/internal/discord_radio.py @@ -0,0 +1,119 @@ +import asyncio +from typing import Optional +import discord +from discord.ext import commands +from app.config import settings +from app.internal.logger import logger + +BOT_READY_TIMEOUT = 15 # seconds to wait for Discord bot to become ready + + +class RadioBot: + def __init__(self): + self._bot: Optional[commands.Bot] = None + self._voice_client: Optional[discord.VoiceClient] = None + self._task: Optional[asyncio.Task] = None + self._ready_event: Optional[asyncio.Event] = None + self._current_token: Optional[str] = None + self._icecast_url = ( + f"http://{settings.icecast_host}:{settings.icecast_port}{settings.icecast_mount}" + ) + + async def join(self, guild_id: int, channel_id: int, token: str) -> bool: + # (Re)start the bot if the token changed or the bot isn't running + if self._current_token != token or not self._is_bot_running(): + if not await self._start_bot(token): + return False + + guild = self._bot.get_guild(guild_id) + if not guild: + logger.error(f"Guild {guild_id} not found — bot may not be a member.") + return False + + channel = guild.get_channel(channel_id) + if not isinstance(channel, discord.VoiceChannel): + logger.error(f"Channel {channel_id} is not a voice channel.") + return False + + try: + if self._voice_client and self._voice_client.is_connected(): + await self._voice_client.disconnect(force=True) + self._voice_client = await channel.connect() + self._play_stream() + logger.info(f"Streaming to #{channel.name} in {guild.name}") + return True + except Exception as e: + logger.error(f"Failed to join voice channel: {e}") + return False + + async def leave(self) -> bool: + if self._voice_client and self._voice_client.is_connected(): + try: + await self._voice_client.disconnect(force=True) + self._voice_client = None + logger.info("Disconnected from voice channel.") + return True + except Exception as e: + logger.error(f"Failed to disconnect: {e}") + return False + + async def stop(self): + await self.leave() + if self._task: + self._task.cancel() + if self._bot: + await self._bot.close() + self._bot = None + self._task = None + self._current_token = None + self._ready_event = None + + def _play_stream(self): + if not self._voice_client: + return + source = discord.FFmpegPCMAudio( + self._icecast_url, + before_options="-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5", + ) + self._voice_client.play( + discord.PCMVolumeTransformer(source, volume=1.0), + after=lambda e: logger.error(f"Stream ended unexpectedly: {e}") if e else None, + ) + + async def _start_bot(self, token: str) -> bool: + await self.stop() # clean up any previous instance + + intents = discord.Intents.default() + intents.voice_states = True + self._bot = commands.Bot(command_prefix="!", intents=intents) + self._ready_event = asyncio.Event() + self._current_token = token + + @self._bot.event + async def on_ready(): + logger.info(f"Discord bot ready: {self._bot.user} ({self._bot.user.id})") + self._ready_event.set() + + self._task = asyncio.create_task(self._bot.start(token)) + + try: + await asyncio.wait_for(self._ready_event.wait(), timeout=BOT_READY_TIMEOUT) + return True + except asyncio.TimeoutError: + logger.error("Timed out waiting for Discord bot to become ready.") + await self.stop() + return False + + def _is_bot_running(self) -> bool: + return ( + self._bot is not None + and self._task is not None + and not self._task.done() + ) + + @property + def is_connected(self) -> bool: + return self._voice_client is not None and self._voice_client.is_connected() + + +radio_bot = RadioBot() diff --git a/drb-edge-node/app/internal/logger.py b/drb-edge-node/app/internal/logger.py new file mode 100644 index 0000000..e14e2c1 --- /dev/null +++ b/drb-edge-node/app/internal/logger.py @@ -0,0 +1,10 @@ +import logging +import sys + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], +) + +logger = logging.getLogger("drb-edge-node") diff --git a/drb-edge-node/app/internal/metadata_watcher.py b/drb-edge-node/app/internal/metadata_watcher.py new file mode 100644 index 0000000..0e1237c --- /dev/null +++ b/drb-edge-node/app/internal/metadata_watcher.py @@ -0,0 +1,127 @@ +import asyncio +import uuid +from datetime import datetime, timezone +from typing import Optional, Callable, Awaitable +from app.internal.op25_client import op25_client +from app.internal.logger import logger + +CallbackFn = Callable[[dict], Awaitable[None]] + +HANG_THRESHOLD = 3 # polls before declaring a call ended (1 poll/sec → 3s hang time) +POLL_INTERVAL = 1.0 # seconds + + +class MetadataWatcher: + def __init__(self): + self._running = False + self._current_tgid: Optional[int] = None + self._hang_counter: int = 0 + self._active_call_id: Optional[str] = None + self._call_started_at: Optional[datetime] = None + + # Set these before calling start() + self.on_call_start: Optional[CallbackFn] = None + self.on_call_end: Optional[CallbackFn] = None + + async def start(self): + self._running = True + asyncio.create_task(self._poll_loop()) + logger.info("Metadata watcher started.") + + async def stop(self): + self._running = False + if self._active_call_id: + await self._end_call() + + async def _poll_loop(self): + while self._running: + try: + await self._tick() + except Exception as e: + logger.warning(f"Metadata poll error: {e}") + await asyncio.sleep(POLL_INTERVAL) + + async def _tick(self): + status = await op25_client.get_terminal_status() + + if not status: + # OP25 not responding — hang-out any active call + if self._active_call_id: + self._hang_counter += 1 + if self._hang_counter >= HANG_THRESHOLD: + await self._end_call() + return + + # OP25 terminal returns either a list of channels or a single dict + channels = status if isinstance(status, list) else [status] + active_tgid: Optional[int] = None + active_meta: dict = {} + + for ch in channels: + tgid = ch.get("tgid") or ch.get("tg_id") + if tgid and str(tgid) not in ("0", "", "None"): + active_tgid = int(tgid) + active_meta = ch + break + + if active_tgid: + self._hang_counter = 0 + if self._current_tgid != active_tgid: + # Talkgroup changed — close previous call and open a new one + if self._active_call_id: + await self._end_call() + self._current_tgid = active_tgid + await self._start_call(active_tgid, active_meta) + else: + # No active talkgroup + if self._active_call_id: + self._hang_counter += 1 + if self._hang_counter >= HANG_THRESHOLD: + await self._end_call() + + async def _start_call(self, tgid: int, meta: dict): + self._active_call_id = str(uuid.uuid4()) + self._call_started_at = datetime.now(timezone.utc) + payload = { + "call_id": self._active_call_id, + "tgid": tgid, + "tgid_name": meta.get("tag") or meta.get("tgid_tag") or "", + "freq": meta.get("freq"), + "srcaddr": meta.get("srcaddr"), + "started_at": self._call_started_at.isoformat(), + } + logger.info(f"Call start: tgid={tgid} id={self._active_call_id}") + if self.on_call_start: + await self.on_call_start(payload) + + async def _end_call(self): + if not self._active_call_id: + return + payload = { + "call_id": self._active_call_id, + "tgid": self._current_tgid, + "started_at": self._call_started_at.isoformat() if self._call_started_at else None, + "ended_at": datetime.now(timezone.utc).isoformat(), + } + logger.info(f"Call end: id={self._active_call_id}") + self._active_call_id = None + self._current_tgid = None + self._hang_counter = 0 + self._call_started_at = None + if self.on_call_end: + await self.on_call_end(payload) + + @property + def active_call_id(self) -> Optional[str]: + return self._active_call_id + + @property + def current_tgid(self) -> Optional[int]: + return self._current_tgid + + @property + def is_active(self) -> bool: + return self._active_call_id is not None + + +metadata_watcher = MetadataWatcher() diff --git a/drb-edge-node/app/internal/mqtt_manager.py b/drb-edge-node/app/internal/mqtt_manager.py new file mode 100644 index 0000000..2d4b03f --- /dev/null +++ b/drb-edge-node/app/internal/mqtt_manager.py @@ -0,0 +1,159 @@ +import asyncio +import json +from datetime import datetime, timezone +from typing import Optional, Callable, Awaitable, Dict, Any +import paho.mqtt.client as mqtt +from app.config import settings +from app.internal.logger import logger + +CommandCallback = Callable[[Dict[str, Any]], Awaitable[None]] +ConfigCallback = Callable[[Dict[str, Any]], Awaitable[None]] +ApiKeyCallback = Callable[[Dict[str, Any]], Awaitable[None]] + + +class MQTTManager: + def __init__(self): + self._client: Optional[mqtt.Client] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._connected = False + self._connect_task: Optional[asyncio.Task] = None + + self.on_command: Optional[CommandCallback] = None + self.on_config_push: Optional[ConfigCallback] = None + self.on_api_key: Optional[ApiKeyCallback] = None + + nid = settings.node_id + self._t_checkin = f"nodes/{nid}/checkin" + self._t_status = f"nodes/{nid}/status" + self._t_metadata = f"nodes/{nid}/metadata" + self._t_commands = f"nodes/{nid}/commands" + self._t_config = f"nodes/{nid}/config" + self._t_api_key = f"nodes/{nid}/api_key" + self._t_discovery = "nodes/discovery/request" + + def _build_client(self) -> mqtt.Client: + client = mqtt.Client( + callback_api_version=mqtt.CallbackAPIVersion.VERSION2, + client_id=settings.node_id, + ) + if settings.mqtt_user: + client.username_pw_set(settings.mqtt_user, settings.mqtt_pass) + + lwt = json.dumps({ + "node_id": settings.node_id, + "status": "offline", + "timestamp": datetime.now(timezone.utc).isoformat(), + }) + client.will_set(self._t_status, lwt, qos=1, retain=True) + + client.reconnect_delay_set(min_delay=2, max_delay=60) + client.on_connect = self._on_connect + client.on_disconnect = self._on_disconnect + client.on_message = self._on_message + return client + + def _on_connect(self, client, userdata, flags, reason_code, properties): + if reason_code == 0: + self._connected = True + client.subscribe(self._t_commands, qos=1) + client.subscribe(self._t_config, qos=1) + client.subscribe(self._t_api_key, qos=2) + client.subscribe(self._t_discovery, qos=0) + logger.info("MQTT connected.") + asyncio.run_coroutine_threadsafe(self._publish_checkin(), self._loop) + else: + logger.error(f"MQTT connect refused: {reason_code}") + + def _on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties): + self._connected = False + logger.warning(f"MQTT disconnected: {reason_code}") + + def _on_message(self, client, userdata, msg): + try: + payload = json.loads(msg.payload.decode()) + except Exception: + payload = msg.payload.decode() + + if msg.topic == self._t_commands and self.on_command: + asyncio.run_coroutine_threadsafe(self.on_command(payload), self._loop) + elif msg.topic == self._t_config and self.on_config_push: + asyncio.run_coroutine_threadsafe(self.on_config_push(payload), self._loop) + elif msg.topic == self._t_api_key and self.on_api_key: + asyncio.run_coroutine_threadsafe(self.on_api_key(payload), self._loop) + elif msg.topic == self._t_discovery: + asyncio.run_coroutine_threadsafe(self._publish_checkin(), self._loop) + + async def connect(self): + self._loop = asyncio.get_event_loop() + self._client = self._build_client() + self._connect_task = asyncio.create_task(self._connect_with_retry()) + + async def _connect_with_retry(self): + """Attempt the initial TCP connect, retrying with backoff until it succeeds.""" + delay = 5 + logger.info(f"MQTT connecting to {settings.mqtt_broker}:{settings.mqtt_port}") + while True: + try: + self._client.connect(settings.mqtt_broker, settings.mqtt_port, keepalive=60) + self._client.loop_start() + # paho loop_start + reconnect_delay_set handles all subsequent reconnects + return + except Exception as e: + logger.warning(f"MQTT connect failed ({e}) — retrying in {delay}s") + await asyncio.sleep(delay) + delay = min(delay * 2, 60) + + async def disconnect(self): + if self._connect_task: + self._connect_task.cancel() + if self._client: + await self.publish_status("offline") + self._client.loop_stop() + self._client.disconnect() + + async def publish_status(self, status: str, extra: dict = None): + payload = { + "node_id": settings.node_id, + "status": status, + "timestamp": datetime.now(timezone.utc).isoformat(), + **(extra or {}), + } + self._publish(self._t_status, payload, qos=1, retain=True) + + async def publish_metadata(self, event_type: str, data: dict): + payload = { + "event": event_type, + "node_id": settings.node_id, + "timestamp": datetime.now(timezone.utc).isoformat(), + **data, + } + self._publish(self._t_metadata, payload, qos=1) + + async def _publish_checkin(self): + payload = { + "node_id": settings.node_id, + "name": settings.node_name, + "lat": settings.node_lat, + "lon": settings.node_lon, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + self._publish(self._t_checkin, payload, qos=1) + + def _publish(self, topic: str, payload: dict, qos: int = 0, retain: bool = False): + if self._client and self._connected: + self._client.publish(topic, json.dumps(payload), qos=qos, retain=retain) + else: + logger.debug(f"MQTT not connected, dropping publish to {topic}") + + async def heartbeat_loop(self): + while True: + if self._connected: + await self._publish_checkin() + await asyncio.sleep(30) + + @property + def is_connected(self) -> bool: + return self._connected + + +mqtt_manager = MQTTManager() diff --git a/drb-edge-node/app/internal/op25_client.py b/drb-edge-node/app/internal/op25_client.py new file mode 100644 index 0000000..28d6566 --- /dev/null +++ b/drb-edge-node/app/internal/op25_client.py @@ -0,0 +1,63 @@ +import httpx +from typing import Optional, Dict, Any +from app.config import settings +from app.internal.logger import logger + + +class OP25Client: + def __init__(self): + self.api_url = settings.op25_api_url + self.terminal_url = settings.op25_terminal_url + + async def start(self) -> bool: + try: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post(f"{self.api_url}/op25/start") + r.raise_for_status() + return True + except Exception as e: + logger.error(f"OP25 start failed: {e}") + return False + + async def stop(self) -> bool: + try: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post(f"{self.api_url}/op25/stop") + r.raise_for_status() + return True + except Exception as e: + logger.error(f"OP25 stop failed: {e}") + return False + + async def status(self) -> Optional[Dict[str, Any]]: + try: + async with httpx.AsyncClient(timeout=5) as client: + r = await client.get(f"{self.api_url}/op25/status") + r.raise_for_status() + return r.json() + except Exception as e: + logger.error(f"OP25 status failed: {e}") + return None + + async def generate_config(self, config: Dict[str, Any]) -> bool: + try: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post(f"{self.api_url}/op25/generate-config", json=config) + r.raise_for_status() + return True + except Exception as e: + logger.error(f"OP25 generate-config failed: {e}") + return False + + async def get_terminal_status(self) -> Optional[Any]: + """Poll the OP25 HTTP terminal for current call metadata.""" + try: + async with httpx.AsyncClient(timeout=3) as client: + r = await client.get(f"{self.terminal_url}/0/status.json") + r.raise_for_status() + return r.json() + except Exception: + return None + + +op25_client = OP25Client() diff --git a/drb-edge-node/app/main.py b/drb-edge-node/app/main.py new file mode 100644 index 0000000..11263d6 --- /dev/null +++ b/drb-edge-node/app/main.py @@ -0,0 +1,141 @@ +import asyncio +from contextlib import asynccontextmanager +from fastapi import FastAPI +from app.config import settings +from app.models import SystemConfig +from app.internal.logger import logger +from app.internal.mqtt_manager import mqtt_manager +from app.internal import credentials +from app.internal.metadata_watcher import metadata_watcher +from app.internal.call_recorder import call_recorder +from app.internal.discord_radio import radio_bot +from app.internal.config_manager import ( + load_node_config, + save_node_config, + apply_system_config, +) +from app.routers import api, ui + + +# --------------------------------------------------------------------------- +# Event handlers wired up at startup +# --------------------------------------------------------------------------- + +async def on_call_start(data: dict): + await mqtt_manager.publish_status("recording") + await mqtt_manager.publish_metadata("call_start", data) + await call_recorder.start_recording(data["call_id"]) + + +async def on_call_end(data: dict): + file_path = await call_recorder.stop_recording() + if file_path: + audio_url = await call_recorder.upload_recording(file_path, data["call_id"]) + if audio_url: + data["audio_url"] = audio_url + await mqtt_manager.publish_metadata("call_end", data) + await mqtt_manager.publish_status("online") + + +async def on_command(payload: dict): + action = payload.get("action") + logger.info(f"Command received: {action}") + + if action == "discord_join": + token = payload.get("token") + if not token: + logger.error("discord_join command missing token — ignoring.") + return + await radio_bot.join( + guild_id=int(payload["guild_id"]), + channel_id=int(payload["channel_id"]), + token=token, + ) + elif action == "discord_leave": + await radio_bot.leave() + elif action == "op25_restart": + from app.internal.op25_client import op25_client + await op25_client.stop() + await asyncio.sleep(2) + await op25_client.start() + else: + logger.warning(f"Unknown command: {action}") + + +async def on_api_key(payload: dict): + key = payload.get("api_key") + if key: + credentials.save_api_key(key) + logger.info("Node API key received and saved.") + + +async def on_config_push(payload: dict): + """C2 pushes a system config — apply it and restart OP25 with the new settings.""" + try: + config = SystemConfig(**payload) + except Exception as e: + logger.error(f"Invalid config push payload: {e}") + return + + node_cfg = load_node_config() + node_cfg.assigned_system_id = config.system_id + node_cfg.system_config = config + node_cfg.configured = True + save_node_config(node_cfg) + apply_system_config(config) + + # Restart OP25 so it picks up the new config + from app.internal.op25_client import op25_client + await op25_client.stop() + await asyncio.sleep(2) + await op25_client.start() + + logger.info(f"Config push applied: {config.name}") + + +# --------------------------------------------------------------------------- +# App lifecycle +# --------------------------------------------------------------------------- + +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info(f"Edge node starting — ID: {settings.node_id}") + + # Load persisted credentials (API key provisioned by C2 after approval) + credentials.load() + + # Wire callbacks + metadata_watcher.on_call_start = on_call_start + metadata_watcher.on_call_end = on_call_end + mqtt_manager.on_command = on_command + mqtt_manager.on_config_push = on_config_push + mqtt_manager.on_api_key = on_api_key + + # Start services (radio_bot starts on-demand when a discord_join command arrives) + await mqtt_manager.connect() + await metadata_watcher.start() + + # Report initial status and resume OP25 if node was already configured before this restart + node_cfg = load_node_config() + initial_status = "online" if node_cfg.configured else "unconfigured" + await mqtt_manager.publish_status(initial_status) + + if node_cfg.configured: + from app.internal.op25_client import op25_client + logger.info("Node is configured — starting OP25.") + await op25_client.start() + + heartbeat_task = asyncio.create_task(mqtt_manager.heartbeat_loop()) + + yield # --- app running --- + + logger.info("Edge node shutting down.") + heartbeat_task.cancel() + await metadata_watcher.stop() + await radio_bot.stop() + await mqtt_manager.disconnect() + + +app = FastAPI(title=f"DRB Edge Node — {settings.node_id}", lifespan=lifespan) +app.include_router(api.router) +app.include_router(ui.router) diff --git a/drb-edge-node/app/models.py b/drb-edge-node/app/models.py new file mode 100644 index 0000000..17496be --- /dev/null +++ b/drb-edge-node/app/models.py @@ -0,0 +1,51 @@ +from pydantic import BaseModel +from typing import Optional, Dict, Any +from enum import Enum +from datetime import datetime + + +class NodeStatus(str, Enum): + ONLINE = "online" + OFFLINE = "offline" + RECORDING = "recording" + UNCONFIGURED = "unconfigured" + + +class CallStatus(str, Enum): + ACTIVE = "active" + ENDED = "ended" + + +class SystemConfig(BaseModel): + system_id: str + name: str + type: str # P25, DMR, NBFM + config: Dict[str, Any] # OP25-compatible config blob passed through to op25-container + + +class NodeConfig(BaseModel): + node_id: str + node_name: str + lat: float + lon: float + assigned_system_id: Optional[str] = None + system_config: Optional[SystemConfig] = None + configured: bool = False + + +class CallEvent(BaseModel): + call_id: str + node_id: str + system_id: Optional[str] = None + talkgroup_id: Optional[int] = None + talkgroup_name: Optional[str] = None + started_at: datetime + ended_at: Optional[datetime] = None + audio_url: Optional[str] = None + status: CallStatus = CallStatus.ACTIVE + + +class DiscordCommand(BaseModel): + action: str # "join" or "leave" + guild_id: Optional[str] = None + channel_id: Optional[str] = None diff --git a/drb-edge-node/app/routers/__init__.py b/drb-edge-node/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/drb-edge-node/app/routers/api.py b/drb-edge-node/app/routers/api.py new file mode 100644 index 0000000..7f21fa8 --- /dev/null +++ b/drb-edge-node/app/routers/api.py @@ -0,0 +1,82 @@ +from fastapi import APIRouter, HTTPException +from app.config import settings +from app.models import SystemConfig +from app.internal.op25_client import op25_client +from app.internal.config_manager import load_node_config, save_node_config, apply_system_config +from app.internal.call_recorder import call_recorder +from app.internal.discord_radio import radio_bot +from app.internal.metadata_watcher import metadata_watcher + +router = APIRouter(prefix="/api", tags=["api"]) + + +@router.get("/status") +async def get_status(): + node_cfg = load_node_config() + op25_status = await op25_client.status() + return { + "node_id": settings.node_id, + "node_name": settings.node_name, + "lat": settings.node_lat, + "lon": settings.node_lon, + "configured": node_cfg.configured, + "assigned_system_id": node_cfg.assigned_system_id, + "is_recording": call_recorder.is_recording, + "active_tgid": metadata_watcher.current_tgid, + "active_call_id": metadata_watcher.active_call_id, + "discord_connected": radio_bot.is_connected, + "icecast_url": ( + f"http://{settings.icecast_host}:{settings.icecast_port}{settings.icecast_mount}" + ), + "op25": op25_status, + } + + +@router.post("/op25/start") +async def start_op25(): + ok = await op25_client.start() + if not ok: + raise HTTPException(500, "Failed to start OP25") + return {"ok": True} + + +@router.post("/op25/stop") +async def stop_op25(): + ok = await op25_client.stop() + if not ok: + raise HTTPException(500, "Failed to stop OP25") + return {"ok": True} + + +@router.get("/config") +async def get_config(): + return load_node_config() + + +@router.post("/config/system") +async def set_system_config(config: SystemConfig): + """ + Apply a system config locally — called by the web UI or pushed by C2. + Writes the OP25 config and persists the node config. + """ + node_cfg = load_node_config() + node_cfg.assigned_system_id = config.system_id + node_cfg.system_config = config + node_cfg.configured = True + save_node_config(node_cfg) + apply_system_config(config) + return {"ok": True} + + +@router.post("/discord/join") +async def discord_join(guild_id: int, channel_id: int): + ok = await radio_bot.join(guild_id, channel_id) + if not ok: + raise HTTPException(500, "Failed to join voice channel") + return {"ok": True} + + +@router.post("/discord/leave") +async def discord_leave(): + await radio_bot.leave() + return {"ok": True} diff --git a/drb-edge-node/app/routers/ui.py b/drb-edge-node/app/routers/ui.py new file mode 100644 index 0000000..d64448b --- /dev/null +++ b/drb-edge-node/app/routers/ui.py @@ -0,0 +1,12 @@ +from pathlib import Path +from fastapi import APIRouter +from fastapi.responses import HTMLResponse + +router = APIRouter(tags=["ui"]) + +_TEMPLATE = Path(__file__).parent.parent / "templates" / "index.html" + + +@router.get("/", response_class=HTMLResponse) +async def index(): + return _TEMPLATE.read_text() diff --git a/drb-edge-node/app/templates/index.html b/drb-edge-node/app/templates/index.html new file mode 100644 index 0000000..331c817 --- /dev/null +++ b/drb-edge-node/app/templates/index.html @@ -0,0 +1,196 @@ + + + + + + DRB Edge Node + + + +

DRB Edge Node

+

Loading…

+ +
+
+
Status
+
+ State + +
+
+ MQTT + +
+
+ Discord + +
+
+ Recording + +
+
+ +
+
Current Call
+
+ Talkgroup + +
+
+ Call ID + +
+
+ System + +
+
+ OP25 + +
+
+ +
+
Node Info
+
+ Lat / Lon + +
+
+ Configured + +
+ Listen via Icecast +
+
+ + + +

Last updated:

+ + + + diff --git a/drb-edge-node/pytest.ini b/drb-edge-node/pytest.ini new file mode 100644 index 0000000..78c5011 --- /dev/null +++ b/drb-edge-node/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +asyncio_mode = auto +testpaths = tests diff --git a/drb-edge-node/requirements.txt b/drb-edge-node/requirements.txt new file mode 100644 index 0000000..a82a06d --- /dev/null +++ b/drb-edge-node/requirements.txt @@ -0,0 +1,9 @@ +fastapi +uvicorn[standard] +pydantic-settings +paho-mqtt>=2.0.0 +httpx +discord.py[voice] +PyNaCl +pytest +pytest-asyncio diff --git a/drb-edge-node/tests/__init__.py b/drb-edge-node/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/drb-edge-node/tests/conftest.py b/drb-edge-node/tests/conftest.py new file mode 100644 index 0000000..ef54896 --- /dev/null +++ b/drb-edge-node/tests/conftest.py @@ -0,0 +1,5 @@ +import os + +# Satisfy pydantic-settings required fields before any app module is imported. +os.environ.setdefault("NODE_ID", "test-node-01") +os.environ.setdefault("MQTT_BROKER", "localhost") diff --git a/drb-edge-node/tests/test_config_manager.py b/drb-edge-node/tests/test_config_manager.py new file mode 100644 index 0000000..80f9cbf --- /dev/null +++ b/drb-edge-node/tests/test_config_manager.py @@ -0,0 +1,131 @@ +""" +Unit tests for config_manager — pure file I/O, no external services. +""" +import json +import pytest +from pathlib import Path +from unittest.mock import patch +from app.models import NodeConfig, SystemConfig + + +# --------------------------------------------------------------------------- +# save / load round-trip +# --------------------------------------------------------------------------- + +def test_save_and_load_roundtrip(tmp_path): + config = NodeConfig( + node_id="test-node-01", + node_name="Test Node", + lat=40.7128, + lon=-74.0060, + configured=True, + assigned_system_id="sys-001", + ) + + config_file = tmp_path / "node_config.json" + with patch("app.internal.config_manager._CONFIG_FILE", config_file): + import app.internal.config_manager as cm + cm.save_node_config(config) + loaded = cm.load_node_config() + + assert loaded.node_id == config.node_id + assert loaded.node_name == config.node_name + assert loaded.lat == pytest.approx(config.lat) + assert loaded.lon == pytest.approx(config.lon) + assert loaded.configured is True + assert loaded.assigned_system_id == "sys-001" + + +def test_save_writes_valid_json(tmp_path): + config = NodeConfig( + node_id="node-x", + node_name="X", + lat=1.0, + lon=2.0, + ) + config_file = tmp_path / "node_config.json" + with patch("app.internal.config_manager._CONFIG_FILE", config_file): + import app.internal.config_manager as cm + cm.save_node_config(config) + + data = json.loads(config_file.read_text()) + assert data["node_id"] == "node-x" + assert data["configured"] is False + + +# --------------------------------------------------------------------------- +# load fallback behaviour +# --------------------------------------------------------------------------- + +def test_load_missing_file_returns_defaults(tmp_path): + config_file = tmp_path / "nonexistent.json" + with patch("app.internal.config_manager._CONFIG_FILE", config_file): + import app.internal.config_manager as cm + result = cm.load_node_config() + + assert result.configured is False + + +def test_load_invalid_json_returns_defaults(tmp_path): + config_file = tmp_path / "node_config.json" + config_file.write_text("not valid json {{{") + + with patch("app.internal.config_manager._CONFIG_FILE", config_file): + import app.internal.config_manager as cm + result = cm.load_node_config() + + assert result.configured is False + + +def test_load_partial_json_returns_defaults(tmp_path): + """A truncated file (e.g. mid-write crash) should fall back to defaults.""" + config_file = tmp_path / "node_config.json" + config_file.write_text('{"node_id": "x"') # truncated + + with patch("app.internal.config_manager._CONFIG_FILE", config_file): + import app.internal.config_manager as cm + result = cm.load_node_config() + + assert result.configured is False + + +# --------------------------------------------------------------------------- +# apply_system_config +# --------------------------------------------------------------------------- + +def test_apply_system_config_writes_json(tmp_path): + system = SystemConfig( + system_id="sys-001", + name="Test System", + type="P25", + config={"freq": 851.0125, "nac": 0x293}, + ) + + with patch("app.internal.config_manager.settings") as mock_settings: + mock_settings.config_path = str(tmp_path) + import app.internal.config_manager as cm + result = cm.apply_system_config(system) + + assert result is True + written = json.loads((tmp_path / "active.cfg.json").read_text()) + assert written["freq"] == pytest.approx(851.0125) + assert written["nac"] == 0x293 + + +def test_apply_system_config_returns_false_on_error(tmp_path): + system = SystemConfig( + system_id="sys-001", + name="Test", + type="DMR", + config={}, + ) + + with patch("app.internal.config_manager.settings") as mock_settings: + # Point at a path that can't be written (a file, not a dir) + blocker = tmp_path / "blocker" + blocker.touch() + mock_settings.config_path = str(blocker) + import app.internal.config_manager as cm + result = cm.apply_system_config(system) + + assert result is False diff --git a/drb-edge-node/tests/test_metadata_watcher.py b/drb-edge-node/tests/test_metadata_watcher.py new file mode 100644 index 0000000..faff8d4 --- /dev/null +++ b/drb-edge-node/tests/test_metadata_watcher.py @@ -0,0 +1,190 @@ +""" +Unit tests for MetadataWatcher state machine. +All OP25 HTTP calls are mocked — no running services required. +""" +import pytest +from unittest.mock import AsyncMock, patch +from app.internal.metadata_watcher import MetadataWatcher, HANG_THRESHOLD + + +@pytest.fixture +def watcher(): + w = MetadataWatcher() + w.on_call_start = AsyncMock() + w.on_call_end = AsyncMock() + return w + + +# --------------------------------------------------------------------------- +# Call start +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_call_starts_when_tgid_appears(watcher): + status = [{"tgid": 1234, "tag": "Police Dispatch"}] + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=status)): + await watcher._tick() + + assert watcher.is_active + assert watcher.current_tgid == 1234 + watcher.on_call_start.assert_called_once() + payload = watcher.on_call_start.call_args[0][0] + assert payload["tgid"] == 1234 + assert payload["tgid_name"] == "Police Dispatch" + assert "call_id" in payload + assert "started_at" in payload + + +@pytest.mark.asyncio +async def test_tgid_zero_does_not_start_call(watcher): + status = [{"tgid": 0}] + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=status)): + await watcher._tick() + + assert not watcher.is_active + watcher.on_call_start.assert_not_called() + + +@pytest.mark.asyncio +async def test_tgid_none_string_does_not_start_call(watcher): + status = [{"tgid": "None"}] + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=status)): + await watcher._tick() + + assert not watcher.is_active + + +@pytest.mark.asyncio +async def test_op25_offline_does_not_start_call(watcher): + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=None)): + await watcher._tick() + + assert not watcher.is_active + watcher.on_call_start.assert_not_called() + + +# --------------------------------------------------------------------------- +# Hang / call end +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_hang_below_threshold_keeps_call_alive(watcher): + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=[{"tgid": 1234}])): + await watcher._tick() + + assert watcher.is_active + + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=[{"tgid": 0}])): + for _ in range(HANG_THRESHOLD - 1): + await watcher._tick() + + assert watcher.is_active + watcher.on_call_end.assert_not_called() + + +@pytest.mark.asyncio +async def test_hang_at_threshold_ends_call(watcher): + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=[{"tgid": 1234}])): + await watcher._tick() + + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=[{"tgid": 0}])): + for _ in range(HANG_THRESHOLD): + await watcher._tick() + + assert not watcher.is_active + watcher.on_call_end.assert_called_once() + payload = watcher.on_call_end.call_args[0][0] + assert "call_id" in payload + assert "ended_at" in payload + + +@pytest.mark.asyncio +async def test_op25_offline_triggers_hang_and_ends_call(watcher): + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=[{"tgid": 1234}])): + await watcher._tick() + + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=None)): + for _ in range(HANG_THRESHOLD): + await watcher._tick() + + assert not watcher.is_active + watcher.on_call_end.assert_called_once() + + +@pytest.mark.asyncio +async def test_hang_counter_resets_when_tgid_returns(watcher): + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=[{"tgid": 1234}])): + await watcher._tick() + + # Partial hang — not enough to end + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=[{"tgid": 0}])): + for _ in range(HANG_THRESHOLD - 1): + await watcher._tick() + + assert watcher.is_active + + # tgid returns — counter resets + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=[{"tgid": 1234}])): + await watcher._tick() + + assert watcher._hang_counter == 0 + assert watcher.is_active + + +# --------------------------------------------------------------------------- +# Talkgroup changes +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_talkgroup_change_closes_old_and_opens_new(watcher): + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=[{"tgid": 1111}])): + await watcher._tick() + + first_call_id = watcher.active_call_id + + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=[{"tgid": 2222}])): + await watcher._tick() + + assert watcher.is_active + assert watcher.current_tgid == 2222 + assert watcher.active_call_id != first_call_id + watcher.on_call_end.assert_called_once() + assert watcher.on_call_start.call_count == 2 + + +# --------------------------------------------------------------------------- +# Status format variations +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_single_dict_status_instead_of_list(watcher): + """OP25 terminal may return a bare dict instead of a list.""" + status = {"tgid": 9999, "tag": "Fire"} + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=status)): + await watcher._tick() + + assert watcher.current_tgid == 9999 + + +@pytest.mark.asyncio +async def test_tg_id_key_alias(watcher): + """Some OP25 builds use 'tg_id' instead of 'tgid'.""" + status = [{"tg_id": 5555, "tag": "EMS"}] + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=status)): + await watcher._tick() + + assert watcher.current_tgid == 5555 + + +@pytest.mark.asyncio +async def test_multichannel_uses_first_active(watcher): + """When multiple channels are returned, first active tgid wins.""" + status = [ + {"tgid": 0}, + {"tgid": 7777, "tag": "Roads"}, + {"tgid": 8888, "tag": "Other"}, + ] + with patch("app.internal.metadata_watcher.op25_client.get_terminal_status", new=AsyncMock(return_value=status)): + await watcher._tick() + + assert watcher.current_tgid == 7777 diff --git a/icecast/Dockerfile b/icecast/Dockerfile new file mode 100644 index 0000000..cbbc7af --- /dev/null +++ b/icecast/Dockerfile @@ -0,0 +1,18 @@ +FROM debian:bookworm-slim + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt-get update && apt-get install -y \ + icecast2 \ + gettext-base \ + gosu \ + && rm -rf /var/lib/apt/lists/* \ + && id icecast 2>/dev/null || adduser --system --group --no-create-home icecast + +COPY icecast.xml.template /etc/icecast2/icecast.xml.template +COPY entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +EXPOSE 8000 + +ENTRYPOINT ["/entrypoint.sh"] diff --git a/icecast/entrypoint.sh b/icecast/entrypoint.sh new file mode 100644 index 0000000..3334ce7 --- /dev/null +++ b/icecast/entrypoint.sh @@ -0,0 +1,11 @@ +#!/bin/sh +set -e + +ICECAST_SOURCE_PASSWORD="${ICECAST_SOURCE_PASSWORD:-hackme}" +ICECAST_ADMIN_PASSWORD="${ICECAST_ADMIN_PASSWORD:-admin}" + +export ICECAST_SOURCE_PASSWORD ICECAST_ADMIN_PASSWORD + +envsubst < /etc/icecast2/icecast.xml.template > /tmp/icecast.xml + +exec gosu icecast icecast2 -c /tmp/icecast.xml diff --git a/icecast/icecast.xml.template b/icecast/icecast.xml.template new file mode 100644 index 0000000..1c4dfd5 --- /dev/null +++ b/icecast/icecast.xml.template @@ -0,0 +1,49 @@ + + server + admin@localhost + + + 100 + 10 + 5 + 524288 + 30 + 15 + 10 + 1 + 65535 + + + + ${ICECAST_SOURCE_PASSWORD} + ${ICECAST_SOURCE_PASSWORD} + admin + ${ICECAST_ADMIN_PASSWORD} + + + localhost + + 8000 + + + +
+ + + + /usr/share/icecast2 + /tmp + /usr/share/icecast2/web + /usr/share/icecast2/admin + + + + icecast-access.log + icecast-error.log + 3 + + + + 0 + + diff --git a/op25-container/.gitea/workflows/build-nightly.yml b/op25-container/.gitea/workflows/build-nightly.yml new file mode 100644 index 0000000..1c38461 --- /dev/null +++ b/op25-container/.gitea/workflows/build-nightly.yml @@ -0,0 +1,57 @@ +name: release-tag + +on: + push: + branches: + - dev + +jobs: + release-image: + runs-on: ubuntu-latest + env: + DOCKER_LATEST: stable + CONTAINER_NAME: drb-client-discord-bot + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker BuildX + uses: docker/setup-buildx-action@v3 + with: # replace it with your local IP + config-inline: | + [registry."git.vpn.cusano.net"] + http = false + insecure = false + + - name: Login to DockerHub + uses: docker/login-action@v3 + with: + registry: git.vpn.cusano.net # replace it with your local IP + username: ${{ secrets.GIT_REPO_USERNAME }} + password: ${{ secrets.GIT_REPO_PASSWORD }} + + - name: Get Meta + id: meta + run: | + echo REPO_NAME=$(echo ${GITHUB_REPOSITORY} | awk -F"/" '{print $2}') >> $GITHUB_OUTPUT + echo REPO_VERSION=$(git describe --tags --always | sed 's/^v//') >> $GITHUB_OUTPUT + + - name: Validate build configuration + uses: docker/build-push-action@v6 + with: + call: check + + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: . + file: ./Dockerfile + platforms: | + linux/arm64 + push: true + tags: | # replace it with your local IP and tags + git.vpn.cusano.net/${{ vars.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}/${{ env.CONTAINER_NAME }}:${{ steps.meta.outputs.REPO_VERSION }} + git.vpn.cusano.net/${{ vars.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}/${{ env.CONTAINER_NAME }}:${{ env.DOCKER_LATEST }} \ No newline at end of file diff --git a/op25-container/.gitea/workflows/build-stable.yml b/op25-container/.gitea/workflows/build-stable.yml new file mode 100644 index 0000000..2190f34 --- /dev/null +++ b/op25-container/.gitea/workflows/build-stable.yml @@ -0,0 +1,60 @@ +name: release-tag + +on: + push: + branches: + - master + +jobs: + release-image: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + env: + DOCKER_LATEST: stable + CONTAINER_NAME: op25-client + steps: + - name: Checkout + uses: actions/checkout@v5 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker BuildX + uses: docker/setup-buildx-action@v3 + with: + config-inline: | + [registry."git.vpn.cusano.net"] + http = false + insecure = false + + - name: Login to Gitea Container Registry + uses: docker/login-action@v3 + with: + registry: git.vpn.cusano.net + username: ${{ gitea.actor }} # Uses the user or bot that triggered the workflow + password: ${{ secrets.GITHUB_COM_TOKEN }} # The built-in, temporary token + + - name: Get Meta + id: meta + run: | + echo REPO_NAME=$(echo ${GITHUB_REPOSITORY} | awk -F"/" '{print $2}') >> $GITHUB_OUTPUT + echo REPO_VERSION=$(git describe --tags --always | sed 's/^v//') >> $GITHUB_OUTPUT + + - name: Validate build configuration + uses: docker/build-push-action@v6 + with: + call: check + + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: . + file: ./Dockerfile + platforms: | + linux/arm64 + push: true + tags: | + git.vpn.cusano.net/${{ vars.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}/${{ env.CONTAINER_NAME }}:${{ steps.meta.outputs.REPO_VERSION }} + git.vpn.cusano.net/${{ vars.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}/${{ env.CONTAINER_NAME }}:${{ env.DOCKER_LATEST }} diff --git a/op25-container/.gitea/workflows/lint.yml b/op25-container/.gitea/workflows/lint.yml new file mode 100644 index 0000000..ae283a9 --- /dev/null +++ b/op25-container/.gitea/workflows/lint.yml @@ -0,0 +1,30 @@ +name: Lint + +on: + push: + branches: + - master + pull_request: + branches: + - "*" + +jobs: + lint: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.13' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 + + - name: Run Lint + run: | + flake8 --max-line-length=88 --ignore=E203,E302,E501 . \ No newline at end of file diff --git a/op25-container/.gitignore b/op25-container/.gitignore new file mode 100644 index 0000000..f77315b --- /dev/null +++ b/op25-container/.gitignore @@ -0,0 +1,6 @@ +__pycache__* +bot-poc.py +configs* +.env +*.log* +.venv \ No newline at end of file diff --git a/op25-container/Dockerfile b/op25-container/Dockerfile new file mode 100644 index 0000000..bdebcea --- /dev/null +++ b/op25-container/Dockerfile @@ -0,0 +1,51 @@ +## OP25 Core Container +FROM python:slim-trixie + +# Set environment variables +ENV DEBIAN_FRONTEND=noninteractive + +# Install system dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git pulseaudio pulseaudio-utils liquidsoap -y + +# Clone the boatbod op25 repository +RUN git clone -b gr310 https://github.com/boatbod/op25 /op25 + +# Set the working directory +WORKDIR /op25 + +# Run the install script to set up op25 +RUN sed -i 's/sudo //g' install.sh +RUN ./install.sh -f + +# Install Python dependencies +COPY requirements.txt /tmp/requirements.txt +RUN pip3 install --no-cache-dir -r /tmp/requirements.txt + +# Create the run_multi-rx_service.sh script +COPY run_multi-rx_service.sh /op25/op25/gr-op25_repeater/apps/run_multi-rx_service.sh +RUN chmod +x /op25/op25/gr-op25_repeater/apps/run_multi-rx_service.sh + +# Expose ports for HTTP control as needed, for example: +EXPOSE 8001 8081 + +# Create and set up the configuration directory +VOLUME ["/configs"] + +# Set the working directory in the container +WORKDIR /app + +# Copy the rest of the directory contents into the container at /app +COPY ./app /app + +# 1. Copy the wrapper script and make it executable +COPY docker-entrypoint.sh /usr/local/bin/ +RUN sed -i 's/\r$//' /usr/local/bin/docker-entrypoint.sh && \ + chmod +x /usr/local/bin/docker-entrypoint.sh + +# 2. Update ENTRYPOINT to use the wrapper script +ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"] + +# 3. Use CMD to pass the uvicorn command as arguments to the ENTRYPOINT script +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--reload"] \ No newline at end of file diff --git a/op25-container/app/internal/liquidsoap_config_utils.py b/op25-container/app/internal/liquidsoap_config_utils.py new file mode 100644 index 0000000..589d031 --- /dev/null +++ b/op25-container/app/internal/liquidsoap_config_utils.py @@ -0,0 +1,34 @@ +import os +from internal.op25_liq_template import liquidsoap_config_template +from models import IcecastConfig + +def generate_liquid_script(config: IcecastConfig): + """ + Generates the "*.liq" file that's run by OP25 on startup. + + Placeholders in the template must be formatted as ${VARIABLE_NAME}. + + Args: + config (dict): A dictionary of key-value pairs for substitution. + Keys should match the variable names in the template (e.g., 'icecast_host'). + """ + try: + content = liquidsoap_config_template + # Replace variables + for key, value in config.model_dump().items(): + placeholder = f"${{{key}}}" + # Ensure the value is converted to string for replacement + content = content.replace(placeholder, str(value)) + print(f" - Replaced placeholder {placeholder}") + + # Write the processed content to the output path + output_path = "/configs/op25.liq" + with open(output_path, 'w') as f: + f.write(content) + + print(f"\nSuccessfully wrote processed configuration to: {output_path}") + + except FileNotFoundError: + print(f"Error: Template file not found at {template_path}") + except Exception as e: + print(f"An unexpected error occurred: {e}") \ No newline at end of file diff --git a/op25-container/app/internal/logger.py b/op25-container/app/internal/logger.py new file mode 100644 index 0000000..4b9ef0c --- /dev/null +++ b/op25-container/app/internal/logger.py @@ -0,0 +1,55 @@ +import logging +from logging.handlers import RotatingFileHandler + +def create_logger(name, level=logging.DEBUG, max_bytes=10485760, backup_count=2): + """ + Creates a logger with a console and rotating file handlers for both debug and info log levels. + + Args: + name (str): The name for the logger. + level (int): The logging level for the logger. Defaults to logging.DEBUG. + max_bytes (int): Maximum size of the log file in bytes before it gets rotated. Defaults to 10 MB. + backup_count (int): Number of backup files to keep. Defaults to 2. + + Returns: + logging.Logger: Configured logger. + """ + # Set the log file paths + debug_log_file = "./client.debug.log" + info_log_file = "./client.log" + + # Create a logger + logger = logging.getLogger(name) + logger.setLevel(level) + + # Check if the logger already has handlers to avoid duplicate logs + if not logger.hasHandlers(): + # Create console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + + # Create rotating file handler for debug level + debug_file_handler = RotatingFileHandler(debug_log_file, maxBytes=max_bytes, backupCount=backup_count) + debug_file_handler.setLevel(logging.DEBUG) + + # Create rotating file handler for info level + info_file_handler = RotatingFileHandler(info_log_file, maxBytes=max_bytes, backupCount=backup_count) + info_file_handler.setLevel(logging.INFO) + + # Create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + debug_file_handler.setFormatter(formatter) + info_file_handler.setFormatter(formatter) + + # Add the handlers to the logger + logger.addHandler(console_handler) + logger.addHandler(debug_file_handler) + logger.addHandler(info_file_handler) + + return logger + +# Example usage: +# logger = create_logger('my_logger') +# logger.debug('This is a debug message') +# logger.info('This is an info message') diff --git a/op25-container/app/internal/op25_config_utls.py b/op25-container/app/internal/op25_config_utls.py new file mode 100644 index 0000000..36360a1 --- /dev/null +++ b/op25-container/app/internal/op25_config_utls.py @@ -0,0 +1,70 @@ +import csv +import json +from models import TalkgroupTag +from typing import List +from internal.logger import create_logger + +LOGGER = create_logger(__name__) + +def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: + """ + Writes a list of tags to the tags file. + + Args: + talkgroup_tags (List[TalkgroupTag]): The list of TalkgroupTag instances. + """ + with open("/configs/active.cfg.tags.tsv", 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file, delimiter='\t', lineterminator='\n') + # Write rows + for tag in talkgroup_tags: + writer.writerow([tag.tagDec, tag.talkgroup]) + +def save_whitelist(talkgroup_tags: List[int]) -> None: + """ + Writes a list of talkgroups to the whitelists file. + + Args: + talkgroup_tags (List[int]): The list of decimals to whitelist. + """ + with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file, delimiter='\t', lineterminator='\n') + # Write rows + for tag in talkgroup_tags: + writer.writerow([tag]) + +def del_none_in_dict(d): + """ + Delete keys with the value ``None`` in a dictionary, recursively. + + This alters the input so you may wish to ``copy`` the dict first. + """ + for key, value in list(d.items()): + LOGGER.info(f"Key: '{key}'\nValue: '{value}'") + if value is None: + del d[key] + elif isinstance(value, dict): + del_none_in_dict(value) + elif isinstance(value, list): + for iterative_value in value: + del_none_in_dict(iterative_value) + return d # For convenience + +def get_current_system_from_config() -> str: + # Get the current config + with open('/configs/active.cfg.json', 'r') as f: + json_data = f.read() + if isinstance(json_data, str): + try: + data = json.loads(json_data) + except json.JSONDecodeError: + return None + elif isinstance(json_data, dict): + data = json_data + else: + return None + + if "channels" in data and isinstance(data["channels"], list) and len(data["channels"]) > 0: + first_channel = data["channels"][0] + if "name" in first_channel: + return first_channel["name"] + return None diff --git a/op25-container/app/internal/op25_liq_template.py b/op25-container/app/internal/op25_liq_template.py new file mode 100644 index 0000000..4c49ee1 --- /dev/null +++ b/op25-container/app/internal/op25_liq_template.py @@ -0,0 +1,44 @@ +liquidsoap_config_template = """#!/usr/bin/liquidsoap + +# OP25 → Icecast streaming (Liquidsoap 2.x) + +settings.log.stdout.set(true) +settings.log.file.set(false) +settings.log.level.set(1) +settings.frame.audio.samplerate.set(8000) +settings.init.allow_root.set(true) + +# ========================================================== +ICE_HOST = "${icecast_host}" +ICE_PORT = ${icecast_port} +ICE_MOUNT = "${icecast_mountpoint}" +ICE_PASSWORD = "${icecast_password}" +ICE_DESCRIPTION = "${icecast_description}" +ICE_GENRE = "${icecast_genre}" +# ========================================================== + +input = mksafe(input.external(buffer=0.25, channels=2, samplerate=8000, restart_on_error=false, "./audio.py -x 2.5 -s")) +# Consider increasing the buffer value on slow systems such as RPi3. e.g. buffer=0.25 + +# Compression +input = compress(input, attack = 2.0, gain = 0.0, knee = 13.0, ratio = 2.0, release = 12.3, threshold = -18.0) + +# Normalization +input = normalize(input, gain_max = 6.0, gain_min = -6.0, target = -16.0, threshold = -65.0) + +# ========================================================== +# OUTPUT: Referencing the new variables +# ========================================================== +output.icecast( + %mp3(bitrate=16, samplerate=22050, stereo=false), + description=ICE_DESCRIPTION, + genre=ICE_GENRE, + url="", + fallible=false, + host=ICE_HOST, + port=ICE_PORT, + mount=ICE_MOUNT, + password=ICE_PASSWORD, + mean(input) +) +""" \ No newline at end of file diff --git a/op25-container/app/main.py b/op25-container/app/main.py new file mode 100644 index 0000000..9d53b85 --- /dev/null +++ b/op25-container/app/main.py @@ -0,0 +1,11 @@ +from fastapi import FastAPI +import routers.op25_controller as op25_controller +from internal.logger import create_logger + +# Initialize logging +LOGGER = create_logger(__name__) + +# Define FastAPI app +app = FastAPI() + +app.include_router(op25_controller.create_op25_router(), prefix="/op25") diff --git a/op25-container/app/models.py b/op25-container/app/models.py new file mode 100644 index 0000000..d04052b --- /dev/null +++ b/op25-container/app/models.py @@ -0,0 +1,111 @@ +from pydantic import BaseModel +from typing import List, Optional, Union +from enum import Enum + +class DecodeMode(str, Enum): + P25 = "P25" + DMR = "DMR" + ANALOG = "NBFM" + +class TalkgroupTag(BaseModel): + talkgroup: str + tagDec: int + +class ConfigGenerator(BaseModel): + type: DecodeMode + systemName: str + channels: List[Union[int, str]] + tags: Optional[List[TalkgroupTag]] + whitelist: Optional[List[int]] + icecastConfig: Optional[IcecastConfig] + +class DemodType(str, Enum): + CQPSK = "cqpsk" + FSK4 = "fsk4" + +class FilterType(str, Enum): + RC = "rc" + WIDEPULSE = "widepulse" + +class ChannelConfig(BaseModel): + name: str + trunking_sysname: Optional[str] + enable_analog: str + meta_stream_name: str + demod_type: DemodType + filter_type: FilterType + device: Optional[str] = "sdr" + cqpsk_tracking: Optional[bool] = None + frequency: Optional[float] = None + nbfmSquelch: Optional[float] = None + destination: Optional[str] = "udp://127.0.0.1:23456" + tracking_threshold: Optional[int] = 120 + tracking_feedback: Optional[float] = 0.75 + excess_bw: Optional[float] = 0.2 + if_rate: Optional[int] = 24000 + plot: Optional[str] = "" + symbol_rate: Optional[int] = 4800 + blacklist: Optional[str] = "" + whitelist: Optional[str] = "" + +class DeviceConfig(BaseModel): + args: Optional[str] = "rtl" + gains: Optional[str] = "lna:39" + gain_mode: Optional[bool] = False + name: Optional[str] = "sdr" + offset: Optional[int] = 0 + ppm: Optional[float] = 0.0 + rate: Optional[int] = 1920000 + usable_bw_pct: Optional[float] = 0.85 + tunable: Optional[bool] = True + +class TrunkingChannelConfig(BaseModel): + sysname: str + control_channel_list: str + tagsFile: Optional[str] = None + whitelist: Optional[str] = None + nac: Optional[str] = "" + wacn: Optional[str] = "" + tdma_cc: Optional[bool] = False + crypt_behavior: Optional[int] = 2 + +class TrunkingConfig(BaseModel): + module: str + chans: List[TrunkingChannelConfig] + +class MetadataStreamConfig(BaseModel): + stream_name: str = "stream_0" + meta_format_idle: str = "[idle]" + meta_format_tgid: str = "[%TGID%]" + meta_format_tag: str = "[%TGID%] %TAG%" + icecastServerAddress: str = "localhost" + icecastMountpoint: str = "NODE_ID" + icecastMountExt: str = ".xspf" + icecastPass: str = "PASSWORD" + delay: float = 0.0 + +class MetadataConfig(BaseModel): + module: str = "icemeta.py" + streams: List[MetadataStreamConfig] + +class TerminalConfig(BaseModel): + module: Optional[str] = "terminal.py" + terminal_type: Optional[str] = "http:0.0.0.0:8081" + terminal_timeout: Optional[float] = 5.0 + curses_plot_interval: Optional[float] = 0.2 + http_plot_interval: Optional[float] = 1.0 + http_plot_directory: Optional[str] = "../www/images" + tuning_step_large: Optional[int] = 1200 + tuning_step_small: Optional[int] = 100 + + + +### ====================================================== +# Icecast models +class IcecastConfig(BaseModel): + icecast_host: str + icecast_port: int + icecast_mountpoint: str + icecast_password: str + icecast_description: Optional[str] = "OP25" + icecast_genre: Optional[str] = "Public Safety" \ No newline at end of file diff --git a/op25-container/app/routers/op25_controller.py b/op25-container/app/routers/op25_controller.py new file mode 100644 index 0000000..c49c8e7 --- /dev/null +++ b/op25-container/app/routers/op25_controller.py @@ -0,0 +1,128 @@ +from fastapi import HTTPException, APIRouter +import subprocess +import os +import signal +import json +from models import ConfigGenerator, DecodeMode, ChannelConfig, DeviceConfig, TrunkingConfig, TrunkingChannelConfig, TerminalConfig, MetadataConfig, MetadataStreamConfig +from internal.logger import create_logger +from internal.op25_config_utls import save_talkgroup_tags, save_whitelist, del_none_in_dict, get_current_system_from_config +from internal.liquidsoap_config_utils import generate_liquid_script + +LOGGER = create_logger(__name__) + +op25_process = None +OP25_PATH = "/op25/op25/gr-op25_repeater/apps/" +OP25_SCRIPT = "run_multi-rx_service.sh" + +def create_op25_router(): + router = APIRouter() + + @router.post("/start") + async def start_op25(): + global op25_process + if op25_process is None: + try: + op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH) + LOGGER.debug(op25_process) + return {"status": "OP25 started"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + else: + return {"status": "OP25 already running"} + + @router.post("/stop") + async def stop_op25(): + global op25_process + if op25_process is not None: + try: + os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM) + op25_process = None + return {"status": "OP25 stopped"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + else: + return {"status": "OP25 is not running"} + + @router.get("/status") + async def get_status(): + return {"status": "running" if op25_process else "stopped"} + + @router.post("/generate-config") + async def generate_config(generator: ConfigGenerator): + try: + if generator.type == DecodeMode.P25: + channels = [ChannelConfig( + name=generator.systemName, + trunking_sysname=generator.systemName, + enable_analog="off", + demod_type="cqpsk", + cqpsk_tracking=True, + filter_type="rc", + meta_stream_name="stream_0" + )] + devices = [DeviceConfig()] + save_talkgroup_tags(generator.tags) + save_whitelist(generator.whitelist) + trunking = TrunkingConfig( + module="tk_p25.py", + chans=[TrunkingChannelConfig( + sysname=generator.systemName, + control_channel_list=','.join(generator.channels), + tagsFile="/configs/active.cfg.tags.tsv", + whitelist="/configs/active.cfg.whitelist.tsv" + )] + ) + + metadata = MetadataConfig( + streams=[ + MetadataStreamConfig( + stream_name="stream_0", + icecastServerAddress = f"{generator.icecastConfig.icecast_host}:{generator.icecastConfig.icecast_port}", + icecastMountpoint = generator.icecastConfig.icecast_mountpoint, + icecastPass = generator.icecastConfig.icecast_password + ) + ] + ) + + # Generate the op25.liq file + generate_liquid_script(generator.icecastConfig) + + + terminal = TerminalConfig() + + config_dict = { + "channels": [channel.dict() for channel in channels], + "devices": [device.dict() for device in devices], + "trunking": trunking.dict(), + "metadata": metadata.dict(), + "terminal": terminal.dict() + } + + elif generator.type == DecodeMode.ANALOG: + generator = generator.config + channels = [ChannelConfig( + channelName=generator.systemName, + enableAnalog="on", + demodType="fsk4", + frequency=generator.frequency, + filterType="widepulse", + nbfmSquelch=generator.nbfmSquelch + )] + devices = [DeviceConfig(gain="LNA:32")] + + config_dict = { + "channels": [channel.dict() for channel in channels], + "devices": [device.dict() for device in devices] + } + + else: + raise HTTPException(status_code=400, detail="Invalid configuration type. Must be 'p25' or 'nbfm'.") + + with open('/configs/active.cfg.json', 'w') as f: + json.dump(del_none_in_dict(config_dict), f, indent=2) + + return {"message": "Config exported to '/configs/active.cfg.json'"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + return router diff --git a/op25-container/docker-entrypoint.sh b/op25-container/docker-entrypoint.sh new file mode 100644 index 0000000..8263219 --- /dev/null +++ b/op25-container/docker-entrypoint.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +# --- Start PulseAudio Daemon --- +# The -D flag starts it as a daemon. +# The --exit-idle-time=-1 prevents it from automatically shutting down. +echo "Starting PulseAudio daemon..." +pulseaudio -D --exit-idle-time=-1 --system + +# Wait a moment for PulseAudio to initialize +sleep 1 + +# --- Execute the main command (uvicorn) --- +echo "Starting FastAPI application..." +# The main application arguments are passed directly to this script +exec "$@" \ No newline at end of file diff --git a/op25-container/requirements.txt b/op25-container/requirements.txt new file mode 100644 index 0000000..1d2bcfd --- /dev/null +++ b/op25-container/requirements.txt @@ -0,0 +1,2 @@ +uvicorn +fastapi \ No newline at end of file diff --git a/op25-container/run_multi-rx_service.sh b/op25-container/run_multi-rx_service.sh new file mode 100644 index 0000000..a2202b7 --- /dev/null +++ b/op25-container/run_multi-rx_service.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +# Configuration file path +CONFIG_FILE="/configs/active.cfg.json" + +# --- Start the main OP25 receiver (multi_rx.py) in the background --- +# The '&' sends the process to the background. +echo "Starting multi_rx.py..." +./multi_rx.py -v 1 -c $CONFIG_FILE & +MULTI_RX_PID=$! # Store the PID of the background process + +# --- Start the liquid-dsp plot utility (op25.liq) in the background --- +echo "Starting op25.liq..." +liquidsoap /configs/op25.liq & +LIQ_PID=$! # Store the PID of the op25.liq process + +# Wait for both background jobs to finish. +# Since multi_rx.py is the core service, this script will effectively wait +# until multi_rx.py is externally stopped (via the API). +# The trap command ensures that SIGTERM is passed to the background jobs. +trap "kill $MULTI_RX_PID $LIQ_PID" SIGTERM SIGINT +wait $MULTI_RX_PID \ No newline at end of file