From 6ac05eff64abae1ba68ff11939e596420bdcb64b Mon Sep 17 00:00:00 2001 From: Logan Date: Sun, 19 Apr 2026 16:38:15 -0400 Subject: [PATCH] more call recording fixes --- drb-edge-node/app/internal/call_recorder.py | 216 +++++++++++++------- drb-edge-node/app/main.py | 2 + 2 files changed, 149 insertions(+), 69 deletions(-) diff --git a/drb-edge-node/app/internal/call_recorder.py b/drb-edge-node/app/internal/call_recorder.py index b6ca2c2..0fd0848 100644 --- a/drb-edge-node/app/internal/call_recorder.py +++ b/drb-edge-node/app/internal/call_recorder.py @@ -1,94 +1,172 @@ import asyncio -from pathlib import Path +import time +from collections import deque from datetime import datetime, timezone +from pathlib import Path from typing import Optional + import httpx from app.config import settings from app.internal import credentials from app.internal.logger import logger -MAX_RECORDING_SECONDS = 600 # 10 min safety cap; FFmpeg terminates long-running calls +MAX_RECORDING_SECONDS = 600 # safety cap; drop call if it runs this long +PRE_BUFFER_SECONDS = 1.0 # seconds of audio to include before call_start +RING_BUFFER_SECONDS = 60 # how much history to keep when no call is active +READ_CHUNK_BYTES = 4096 # bytes per httpx read class CallRecorder: + """ + Maintains a persistent HTTP connection to the Icecast stream and buffers + the raw MP3 bytes in a ring buffer. When a call starts we note the + monotonic clock; when it ends we slice the buffer and write the file. + + This approach eliminates per-call FFmpeg startup latency, which was + causing empty recordings for calls shorter than ~1–2 s. + """ + def __init__(self): - self._process: Optional[asyncio.subprocess.Process] = None - self._current_call_id: Optional[str] = None - self._current_file: Optional[Path] = None self._recordings_dir = Path(settings.recordings_path) + # Ring buffer: deque of (monotonic_time, bytes_chunk) + self._buffer: deque[tuple[float, bytes]] = deque() + self._buffer_bytes: int = 0 + + self._stream_task: Optional[asyncio.Task] = None + + # Active call state + self._call_id: Optional[str] = None + self._call_start_mono: Optional[float] = None + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> None: + """Start the persistent stream buffer. Call once from app lifespan.""" + self._stream_task = asyncio.create_task(self._stream_loop()) + logger.info("Stream ring-buffer started.") + + async def stop(self) -> None: + """Cancel the stream reader.""" + if self._stream_task: + self._stream_task.cancel() + try: + await self._stream_task + except asyncio.CancelledError: + pass + self._stream_task = None + + # ------------------------------------------------------------------ + # Stream reader + # ------------------------------------------------------------------ + + async def _stream_loop(self) -> None: + stream_url = ( + f"http://{settings.icecast_host}:{settings.icecast_port}" + f"{settings.icecast_mount}" + ) + timeout = httpx.Timeout(connect=10.0, read=None, write=10.0, pool=10.0) + while True: + try: + async with httpx.AsyncClient(timeout=timeout) as client: + async with client.stream("GET", stream_url) as response: + response.raise_for_status() + logger.info(f"Stream buffer connected to {stream_url}") + async for chunk in response.aiter_bytes(READ_CHUNK_BYTES): + self._ingest(chunk) + except asyncio.CancelledError: + raise + except Exception as e: + logger.warning(f"Stream buffer disconnected ({e}) — retrying in 3 s") + await asyncio.sleep(3) + + def _ingest(self, chunk: bytes) -> None: + """Append a chunk and trim stale data from the front of the buffer.""" + now = time.monotonic() + self._buffer.append((now, chunk)) + self._buffer_bytes += len(chunk) + + # During a call, never trim data newer than (call_start - pre_buffer). + # Between calls, keep a rolling RING_BUFFER_SECONDS window. + if self._call_start_mono is not None: + keep_from = self._call_start_mono - PRE_BUFFER_SECONDS + else: + keep_from = now - RING_BUFFER_SECONDS + + while self._buffer: + ts, old = self._buffer[0] + if ts >= keep_from: + break + self._buffer.popleft() + self._buffer_bytes -= len(old) + + # ------------------------------------------------------------------ + # Call recording API (same interface as before) + # ------------------------------------------------------------------ + async def start_recording(self, call_id: str) -> bool: - if self._process: - logger.warning("Recording already running — ignoring start.") - return False - - self._recordings_dir.mkdir(parents=True, exist_ok=True) - ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") - self._current_file = self._recordings_dir / f"{ts}_{call_id}.mp3" - self._current_call_id = call_id - - stream_url = f"http://{settings.icecast_host}:{settings.icecast_port}{settings.icecast_mount}" - cmd = [ - "ffmpeg", "-y", - "-reconnect", "1", - "-reconnect_streamed", "1", - "-reconnect_delay_max", "5", - "-i", stream_url, - "-acodec", "libmp3lame", - "-ar", "22050", - "-b:a", "32k", - "-t", str(MAX_RECORDING_SECONDS), - str(self._current_file), - ] - - try: - self._process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - # Brief check for an immediate crash (e.g. ffmpeg binary missing) - await asyncio.sleep(0.1) - if self._process.returncode is not None: - _, stderr = await self._process.communicate() - logger.error(f"FFmpeg exited immediately ({self._process.returncode}): {stderr.decode()}") - return False - - logger.info(f"Recording process started: {self._current_file.name}") - return True - except Exception as e: - logger.error(f"FFmpeg start failed: {e}") - self._process = None - self._current_file = None - self._current_call_id = None + if self._call_id: + logger.warning("Recording already active — ignoring start.") return False + self._call_id = call_id + self._call_start_mono = time.monotonic() + logger.info(f"Recording started (ring-buffer): {call_id}") + return True async def stop_recording(self) -> Optional[Path]: - if not self._process: + if not self._call_id: return None - proc = self._process - output_file = self._current_file - self._process = None - self._current_file = None - self._current_call_id = None + call_id = self._call_id + call_start = self._call_start_mono + self._call_id = None + self._call_start_mono = None - try: - proc.terminate() - await asyncio.wait_for(proc.wait(), timeout=5) - except asyncio.TimeoutError: - proc.kill() - except ProcessLookupError: - pass + # Slice: everything from (call_start - pre_buffer) to now + cutoff = (call_start - PRE_BUFFER_SECONDS) if call_start else 0.0 + chunks = [chunk for ts, chunk in self._buffer if ts >= cutoff] - if output_file and output_file.exists() and output_file.stat().st_size > 0: - logger.info(f"Recording saved: {output_file.name} ({output_file.stat().st_size} bytes)") - return output_file + # Safety cap: if the call ran very long, truncate to MAX_RECORDING_SECONDS + if call_start is not None: + cap_cutoff = call_start + MAX_RECORDING_SECONDS + now = time.monotonic() + if now > cap_cutoff: + # Approximate: trim chunks that arrived after the cap + cap_keep_until = call_start + MAX_RECORDING_SECONDS + chunks = [ + chunk for ts, chunk in self._buffer + if cutoff <= ts <= cap_keep_until + ] - logger.warning("Recording file empty or missing — discarding.") + if not chunks: + logger.warning( + f"No buffered audio for call {call_id} — " + "stream may not have been connected yet." + ) + return None + + self._recordings_dir.mkdir(parents=True, exist_ok=True) + ts_str = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + output_path = self._recordings_dir / f"{ts_str}_{call_id}.mp3" + + data = b"".join(chunks) + output_path.write_bytes(data) + + size = output_path.stat().st_size + if size > 0: + logger.info(f"Recording saved: {output_path.name} ({size} bytes)") + return output_path + + output_path.unlink(missing_ok=True) + logger.warning(f"Recording for call {call_id} produced an empty file.") return None + # ------------------------------------------------------------------ + # Upload (unchanged interface) + # ------------------------------------------------------------------ + async def upload_recording( self, file_path: Path, @@ -102,8 +180,8 @@ class CallRecorder: return None upload_url = f"{settings.c2_url}/upload" - api_key = credentials.get_api_key() - headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} + api_key = credentials.get_api_key() + headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} form: dict = {"call_id": call_id, "node_id": settings.node_id} if talkgroup_id is not None: @@ -137,7 +215,7 @@ class CallRecorder: @property def is_recording(self) -> bool: - return self._process is not None + return self._call_id is not None call_recorder = CallRecorder() diff --git a/drb-edge-node/app/main.py b/drb-edge-node/app/main.py index 5fb7053..635131a 100644 --- a/drb-edge-node/app/main.py +++ b/drb-edge-node/app/main.py @@ -158,6 +158,7 @@ async def lifespan(app: FastAPI): # Start services (radio_bot starts on-demand when a discord_join command arrives) await mqtt_manager.connect() await metadata_watcher.start() + await call_recorder.start() # persistent Icecast stream buffer # Report initial status and resume OP25 if node was already configured before this restart node_cfg = load_node_config() @@ -181,6 +182,7 @@ async def lifespan(app: FastAPI): logger.info("Edge node shutting down.") heartbeat_task.cancel() await metadata_watcher.stop() + await call_recorder.stop() await radio_bot.stop() await mqtt_manager.disconnect()