From 7de55f9885b9cd2b84a63b70240e76f6c8224036 Mon Sep 17 00:00:00 2001 From: Logan Date: Mon, 6 Apr 2026 00:23:33 -0400 Subject: [PATCH] changes --- docker-compose.yml | 3 + drb-edge-node/Dockerfile | 2 +- drb-edge-node/app/internal/discord_radio.py | 59 +++++++++++++++- .../app/internal/metadata_watcher.py | 4 +- drb-edge-node/app/internal/op25_client.py | 13 +++- drb-edge-node/app/main.py | 54 ++++++++++++--- drb-edge-node/app/templates/index.html | 2 +- .../app/internal/op25_config_utls.py | 2 +- op25-container/app/main.py | 25 ++++++- op25-container/app/routers/op25_controller.py | 69 ++++++++++++------- 10 files changed, 189 insertions(+), 44 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 6fa78ff..3c343a5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,9 +12,11 @@ services: restart: unless-stopped privileged: true network_mode: host + env_file: .env volumes: - ./configs:/configs - /dev:/dev + - ./op25-container/app:/app depends_on: - icecast @@ -26,6 +28,7 @@ services: volumes: - ./configs:/configs - ./recordings:/recordings + - ./drb-edge-node/app:/app/app depends_on: - icecast - op25 diff --git a/drb-edge-node/Dockerfile b/drb-edge-node/Dockerfile index 2b95eab..bb77fe6 100644 --- a/drb-edge-node/Dockerfile +++ b/drb-edge-node/Dockerfile @@ -14,4 +14,4 @@ RUN pip install --no-cache-dir -r requirements.txt COPY app/ ./app/ COPY tests/ ./tests/ -CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"] +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080", "--reload"] diff --git a/drb-edge-node/app/internal/discord_radio.py b/drb-edge-node/app/internal/discord_radio.py index 7522a26..ab13a41 100644 --- a/drb-edge-node/app/internal/discord_radio.py +++ b/drb-edge-node/app/internal/discord_radio.py @@ -19,7 +19,7 @@ class RadioBot: f"http://{settings.icecast_host}:{settings.icecast_port}{settings.icecast_mount}" ) - async def join(self, guild_id: int, channel_id: int, token: str) -> bool: + async def join(self, guild_id: int, channel_id: int, token: str, call_active: bool = False) -> bool: # (Re)start the bot if the token changed or the bot isn't running if self._current_token != token or not self._is_bot_running(): if not await self._start_bot(token): @@ -39,8 +39,10 @@ class RadioBot: if self._voice_client and self._voice_client.is_connected(): await self._voice_client.disconnect(force=True) self._voice_client = await channel.connect() - self._play_stream() - logger.info(f"Streaming to #{channel.name} in {guild.name}") + # Only start playing immediately if a call is currently active + if call_active: + self._play_stream() + logger.info(f"Joined #{channel.name} in {guild.name} (streaming={'yes' if call_active else 'waiting for call'})") return True except Exception as e: logger.error(f"Failed to join voice channel: {e}") @@ -49,6 +51,7 @@ class RadioBot: async def leave(self) -> bool: if self._voice_client and self._voice_client.is_connected(): try: + self._stop_stream() await self._voice_client.disconnect(force=True) self._voice_client = None logger.info("Disconnected from voice channel.") @@ -57,6 +60,19 @@ class RadioBot: logger.error(f"Failed to disconnect: {e}") return False + def start_stream(self): + """Called when an OP25 call starts — begin transmitting audio and light the ring.""" + if self._voice_client and self._voice_client.is_connected(): + if not self._voice_client.is_playing(): + self._play_stream() + logger.debug("Stream started (call active).") + + def stop_stream(self): + """Called when an OP25 call ends — stop transmitting so the ring goes dark.""" + if self._voice_client and self._voice_client.is_connected(): + self._stop_stream() + logger.debug("Stream stopped (call ended).") + async def stop(self): await self.leave() if self._task: @@ -80,11 +96,17 @@ class RadioBot: after=lambda e: logger.error(f"Stream ended unexpectedly: {e}") if e else None, ) + def _stop_stream(self): + if self._voice_client and self._voice_client.is_playing(): + self._voice_client.stop() + async def _start_bot(self, token: str) -> bool: await self.stop() # clean up any previous instance intents = discord.Intents.default() intents.voice_states = True + intents.message_content = True + intents.messages = True self._bot = commands.Bot(command_prefix="!", intents=intents) self._ready_event = asyncio.Event() self._current_token = token @@ -94,6 +116,37 @@ class RadioBot: logger.info(f"Discord bot ready: {self._bot.user} ({self._bot.user.id})") self._ready_event.set() + @self._bot.event + async def on_message(message: discord.Message): + if message.author.bot: + return + if self._bot.user not in message.mentions: + return + content = message.content.lower() + if "leave" in content: + await self.leave() + try: + await message.reply("Disconnected.") + except Exception: + pass + elif "joinme" in content or "join" in content: + member = message.guild.get_member(message.author.id) if message.guild else None + vc = member.voice.channel if member and member.voice else None + if not vc: + try: + await message.reply("You're not in a voice channel.") + except Exception: + pass + return + try: + if self._voice_client and self._voice_client.is_connected(): + await self._voice_client.move_to(vc) + else: + self._voice_client = await vc.connect() + await message.reply(f"Joined {vc.name}.") + except Exception as e: + logger.error(f"joinme failed: {e}") + self._task = asyncio.create_task(self._bot.start(token)) try: diff --git a/drb-edge-node/app/internal/metadata_watcher.py b/drb-edge-node/app/internal/metadata_watcher.py index 0e1237c..ef9f486 100644 --- a/drb-edge-node/app/internal/metadata_watcher.py +++ b/drb-edge-node/app/internal/metadata_watcher.py @@ -7,8 +7,8 @@ from app.internal.logger import logger CallbackFn = Callable[[dict], Awaitable[None]] -HANG_THRESHOLD = 3 # polls before declaring a call ended (1 poll/sec → 3s hang time) -POLL_INTERVAL = 1.0 # seconds +HANG_THRESHOLD = 2 # polls before declaring a call ended (0.5s poll → 1s hang time) +POLL_INTERVAL = 0.5 # seconds class MetadataWatcher: diff --git a/drb-edge-node/app/internal/op25_client.py b/drb-edge-node/app/internal/op25_client.py index 28d6566..8aae869 100644 --- a/drb-edge-node/app/internal/op25_client.py +++ b/drb-edge-node/app/internal/op25_client.py @@ -53,9 +53,18 @@ class OP25Client: """Poll the OP25 HTTP terminal for current call metadata.""" try: async with httpx.AsyncClient(timeout=3) as client: - r = await client.get(f"{self.terminal_url}/0/status.json") + r = await client.post( + self.terminal_url, + json=[{"command": "update", "arg1": 0, "arg2": 0}], + ) r.raise_for_status() - return r.json() + messages = r.json() + for msg in messages: + if msg.get("json_type") == "channel_update": + channels = msg.get("channels", []) + if channels: + return msg.get(str(channels[0]), {}) + return None except Exception: return None diff --git a/drb-edge-node/app/main.py b/drb-edge-node/app/main.py index 11263d6..323edf9 100644 --- a/drb-edge-node/app/main.py +++ b/drb-edge-node/app/main.py @@ -12,7 +12,6 @@ from app.internal.discord_radio import radio_bot from app.internal.config_manager import ( load_node_config, save_node_config, - apply_system_config, ) from app.routers import api, ui @@ -22,12 +21,14 @@ from app.routers import api, ui # --------------------------------------------------------------------------- async def on_call_start(data: dict): + radio_bot.start_stream() await mqtt_manager.publish_status("recording") await mqtt_manager.publish_metadata("call_start", data) await call_recorder.start_recording(data["call_id"]) async def on_call_end(data: dict): + radio_bot.stop_stream() file_path = await call_recorder.stop_recording() if file_path: audio_url = await call_recorder.upload_recording(file_path, data["call_id"]) @@ -50,6 +51,7 @@ async def on_command(payload: dict): guild_id=int(payload["guild_id"]), channel_id=int(payload["channel_id"]), token=token, + call_active=metadata_watcher.is_active, ) elif action == "discord_leave": await radio_bot.leave() @@ -69,8 +71,38 @@ async def on_api_key(payload: dict): logger.info("Node API key received and saved.") +def _to_hz(freq) -> int: + """Convert a frequency to Hz. Accepts MHz floats (< 1e6) or Hz ints.""" + f = float(freq) + return int(f * 1_000_000) if f < 1_000_000 else int(f) + + +async def _generate_op25_config(config: SystemConfig) -> bool: + """Translate a SystemConfig (Firestore format) into OP25 active.cfg.json + op25.liq.""" + from app.internal.op25_client import op25_client + raw = config.config + payload = { + "type": config.type, + "systemName": config.name, + "channels": [_to_hz(ch) for ch in raw.get("control_channels", [])], + "tags": [ + {"talkgroup": str(tg.get("name", "")), "tagDec": int(tg["id"])} + for tg in raw.get("talkgroups", []) + if tg.get("id") is not None + ], + "whitelist": [int(tg["id"]) for tg in raw.get("talkgroups", []) if tg.get("id") is not None], + "icecastConfig": { + "icecast_host": settings.icecast_host, + "icecast_port": settings.icecast_port, + "icecast_mountpoint": settings.icecast_mount, + "icecast_password": settings.icecast_source_password, + }, + } + return await op25_client.generate_config(payload) + + async def on_config_push(payload: dict): - """C2 pushes a system config — apply it and restart OP25 with the new settings.""" + """C2 pushes a system config — translate it to OP25 format and restart OP25.""" try: config = SystemConfig(**payload) except Exception as e: @@ -82,14 +114,15 @@ async def on_config_push(payload: dict): node_cfg.system_config = config node_cfg.configured = True save_node_config(node_cfg) - apply_system_config(config) - # Restart OP25 so it picks up the new config from app.internal.op25_client import op25_client + if not await _generate_op25_config(config): + logger.error(f"Failed to generate OP25 config for {config.name}") + return + await op25_client.stop() await asyncio.sleep(2) await op25_client.start() - logger.info(f"Config push applied: {config.name}") @@ -120,10 +153,15 @@ async def lifespan(app: FastAPI): initial_status = "online" if node_cfg.configured else "unconfigured" await mqtt_manager.publish_status(initial_status) - if node_cfg.configured: + if node_cfg.configured and node_cfg.system_config: from app.internal.op25_client import op25_client - logger.info("Node is configured — starting OP25.") - await op25_client.start() + logger.info("Node is configured — waiting for OP25 API then generating config.") + for attempt in range(10): + if await _generate_op25_config(node_cfg.system_config): + await op25_client.start() + break + logger.warning(f"OP25 not ready yet (attempt {attempt + 1}/10), retrying in 3s…") + await asyncio.sleep(3) heartbeat_task = asyncio.create_task(mqtt_manager.heartbeat_loop()) diff --git a/drb-edge-node/app/templates/index.html b/drb-edge-node/app/templates/index.html index 331c817..92a8991 100644 --- a/drb-edge-node/app/templates/index.html +++ b/drb-edge-node/app/templates/index.html @@ -175,7 +175,7 @@ document.getElementById('tgid').textContent = d.active_tgid ?? '—'; document.getElementById('call-id').textContent = d.active_call_id ? d.active_call_id.slice(0, 8) + '…' : '—'; document.getElementById('system-id').textContent = d.assigned_system_id ?? '—'; - document.getElementById('op25-status').textContent = d.op25?.running ? 'Running' : 'Stopped'; + document.getElementById('op25-status').textContent = d.op25?.status === 'running' ? 'Running' : 'Stopped'; document.getElementById('location').textContent = `${d.lat}, ${d.lon}`; document.getElementById('configured').textContent = d.configured ? 'Yes' : 'No'; diff --git a/op25-container/app/internal/op25_config_utls.py b/op25-container/app/internal/op25_config_utls.py index 36360a1..499722d 100644 --- a/op25-container/app/internal/op25_config_utls.py +++ b/op25-container/app/internal/op25_config_utls.py @@ -29,7 +29,7 @@ def save_whitelist(talkgroup_tags: List[int]) -> None: with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file, delimiter='\t', lineterminator='\n') # Write rows - for tag in talkgroup_tags: + for tag in (talkgroup_tags or []): writer.writerow([tag]) def del_none_in_dict(d): diff --git a/op25-container/app/main.py b/op25-container/app/main.py index 9d53b85..6a4fc92 100644 --- a/op25-container/app/main.py +++ b/op25-container/app/main.py @@ -1,11 +1,30 @@ from fastapi import FastAPI +from contextlib import asynccontextmanager +import os import routers.op25_controller as op25_controller from internal.logger import create_logger +from internal.liquidsoap_config_utils import generate_liquid_script +from models import IcecastConfig -# Initialize logging LOGGER = create_logger(__name__) -# Define FastAPI app -app = FastAPI() + +@asynccontextmanager +async def lifespan(app: FastAPI): + try: + config = IcecastConfig( + icecast_host=os.getenv("ICECAST_HOST", "localhost"), + icecast_port=int(os.getenv("ICECAST_PORT", "8000")), + icecast_mountpoint=os.getenv("ICECAST_MOUNT", "/radio"), + icecast_password=os.getenv("ICECAST_SOURCE_PASSWORD", "hackme"), + ) + generate_liquid_script(config) + LOGGER.info("op25.liq generated from environment variables.") + except Exception as e: + LOGGER.error(f"Failed to generate op25.liq: {e}") + yield + + +app = FastAPI(lifespan=lifespan) app.include_router(op25_controller.create_op25_router(), prefix="/op25") diff --git a/op25-container/app/routers/op25_controller.py b/op25-container/app/routers/op25_controller.py index c49c8e7..714f1dd 100644 --- a/op25-container/app/routers/op25_controller.py +++ b/op25-container/app/routers/op25_controller.py @@ -10,42 +10,64 @@ from internal.liquidsoap_config_utils import generate_liquid_script LOGGER = create_logger(__name__) -op25_process = None OP25_PATH = "/op25/op25/gr-op25_repeater/apps/" OP25_SCRIPT = "run_multi-rx_service.sh" +_PGID_FILE = "/tmp/op25.pgid" + + +def _save_pgid(pgid: int) -> None: + with open(_PGID_FILE, "w") as f: + f.write(str(pgid)) + + +def _read_pgid(): + try: + return int(open(_PGID_FILE).read().strip()) + except Exception: + return None + + +def _is_running() -> bool: + pgid = _read_pgid() + if pgid is None: + return False + try: + os.killpg(pgid, 0) + return True + except OSError: + return False + def create_op25_router(): router = APIRouter() @router.post("/start") async def start_op25(): - global op25_process - if op25_process is None: - try: - op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH) - LOGGER.debug(op25_process) - return {"status": "OP25 started"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - else: + if _is_running(): return {"status": "OP25 already running"} + try: + proc = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH) + _save_pgid(proc.pid) + LOGGER.debug(proc) + return {"status": "OP25 started"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) @router.post("/stop") async def stop_op25(): - global op25_process - if op25_process is not None: - try: - os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM) - op25_process = None - return {"status": "OP25 stopped"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - else: + pgid = _read_pgid() + if pgid is None or not _is_running(): return {"status": "OP25 is not running"} + try: + os.killpg(pgid, signal.SIGTERM) + os.remove(_PGID_FILE) + return {"status": "OP25 stopped"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) @router.get("/status") async def get_status(): - return {"status": "running" if op25_process else "stopped"} + return {"status": "running" if _is_running() else "stopped"} @router.post("/generate-config") async def generate_config(generator: ConfigGenerator): @@ -63,13 +85,14 @@ def create_op25_router(): devices = [DeviceConfig()] save_talkgroup_tags(generator.tags) save_whitelist(generator.whitelist) + has_talkgroups = bool(generator.whitelist) trunking = TrunkingConfig( module="tk_p25.py", chans=[TrunkingChannelConfig( sysname=generator.systemName, - control_channel_list=','.join(generator.channels), - tagsFile="/configs/active.cfg.tags.tsv", - whitelist="/configs/active.cfg.whitelist.tsv" + control_channel_list=','.join(str(ch) for ch in generator.channels), + tagsFile="/configs/active.cfg.tags.tsv" if has_talkgroups else None, + whitelist="/configs/active.cfg.whitelist.tsv" if has_talkgroups else None )] )