more call recording fixes
This commit is contained in:
@@ -1,94 +1,172 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from pathlib import Path
|
import time
|
||||||
|
from collections import deque
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
from app.config import settings
|
from app.config import settings
|
||||||
from app.internal import credentials
|
from app.internal import credentials
|
||||||
from app.internal.logger import logger
|
from app.internal.logger import logger
|
||||||
|
|
||||||
MAX_RECORDING_SECONDS = 600 # 10 min safety cap; FFmpeg terminates long-running calls
|
MAX_RECORDING_SECONDS = 600 # safety cap; drop call if it runs this long
|
||||||
|
PRE_BUFFER_SECONDS = 1.0 # seconds of audio to include before call_start
|
||||||
|
RING_BUFFER_SECONDS = 60 # how much history to keep when no call is active
|
||||||
|
READ_CHUNK_BYTES = 4096 # bytes per httpx read
|
||||||
|
|
||||||
|
|
||||||
class CallRecorder:
|
class CallRecorder:
|
||||||
|
"""
|
||||||
|
Maintains a persistent HTTP connection to the Icecast stream and buffers
|
||||||
|
the raw MP3 bytes in a ring buffer. When a call starts we note the
|
||||||
|
monotonic clock; when it ends we slice the buffer and write the file.
|
||||||
|
|
||||||
|
This approach eliminates per-call FFmpeg startup latency, which was
|
||||||
|
causing empty recordings for calls shorter than ~1–2 s.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._process: Optional[asyncio.subprocess.Process] = None
|
|
||||||
self._current_call_id: Optional[str] = None
|
|
||||||
self._current_file: Optional[Path] = None
|
|
||||||
self._recordings_dir = Path(settings.recordings_path)
|
self._recordings_dir = Path(settings.recordings_path)
|
||||||
|
|
||||||
|
# Ring buffer: deque of (monotonic_time, bytes_chunk)
|
||||||
|
self._buffer: deque[tuple[float, bytes]] = deque()
|
||||||
|
self._buffer_bytes: int = 0
|
||||||
|
|
||||||
|
self._stream_task: Optional[asyncio.Task] = None
|
||||||
|
|
||||||
|
# Active call state
|
||||||
|
self._call_id: Optional[str] = None
|
||||||
|
self._call_start_mono: Optional[float] = None
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Lifecycle
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Start the persistent stream buffer. Call once from app lifespan."""
|
||||||
|
self._stream_task = asyncio.create_task(self._stream_loop())
|
||||||
|
logger.info("Stream ring-buffer started.")
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Cancel the stream reader."""
|
||||||
|
if self._stream_task:
|
||||||
|
self._stream_task.cancel()
|
||||||
|
try:
|
||||||
|
await self._stream_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
self._stream_task = None
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Stream reader
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _stream_loop(self) -> None:
|
||||||
|
stream_url = (
|
||||||
|
f"http://{settings.icecast_host}:{settings.icecast_port}"
|
||||||
|
f"{settings.icecast_mount}"
|
||||||
|
)
|
||||||
|
timeout = httpx.Timeout(connect=10.0, read=None, write=10.0, pool=10.0)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
||||||
|
async with client.stream("GET", stream_url) as response:
|
||||||
|
response.raise_for_status()
|
||||||
|
logger.info(f"Stream buffer connected to {stream_url}")
|
||||||
|
async for chunk in response.aiter_bytes(READ_CHUNK_BYTES):
|
||||||
|
self._ingest(chunk)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Stream buffer disconnected ({e}) — retrying in 3 s")
|
||||||
|
await asyncio.sleep(3)
|
||||||
|
|
||||||
|
def _ingest(self, chunk: bytes) -> None:
|
||||||
|
"""Append a chunk and trim stale data from the front of the buffer."""
|
||||||
|
now = time.monotonic()
|
||||||
|
self._buffer.append((now, chunk))
|
||||||
|
self._buffer_bytes += len(chunk)
|
||||||
|
|
||||||
|
# During a call, never trim data newer than (call_start - pre_buffer).
|
||||||
|
# Between calls, keep a rolling RING_BUFFER_SECONDS window.
|
||||||
|
if self._call_start_mono is not None:
|
||||||
|
keep_from = self._call_start_mono - PRE_BUFFER_SECONDS
|
||||||
|
else:
|
||||||
|
keep_from = now - RING_BUFFER_SECONDS
|
||||||
|
|
||||||
|
while self._buffer:
|
||||||
|
ts, old = self._buffer[0]
|
||||||
|
if ts >= keep_from:
|
||||||
|
break
|
||||||
|
self._buffer.popleft()
|
||||||
|
self._buffer_bytes -= len(old)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Call recording API (same interface as before)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
async def start_recording(self, call_id: str) -> bool:
|
async def start_recording(self, call_id: str) -> bool:
|
||||||
if self._process:
|
if self._call_id:
|
||||||
logger.warning("Recording already running — ignoring start.")
|
logger.warning("Recording already active — ignoring start.")
|
||||||
return False
|
|
||||||
|
|
||||||
self._recordings_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
|
||||||
self._current_file = self._recordings_dir / f"{ts}_{call_id}.mp3"
|
|
||||||
self._current_call_id = call_id
|
|
||||||
|
|
||||||
stream_url = f"http://{settings.icecast_host}:{settings.icecast_port}{settings.icecast_mount}"
|
|
||||||
cmd = [
|
|
||||||
"ffmpeg", "-y",
|
|
||||||
"-reconnect", "1",
|
|
||||||
"-reconnect_streamed", "1",
|
|
||||||
"-reconnect_delay_max", "5",
|
|
||||||
"-i", stream_url,
|
|
||||||
"-acodec", "libmp3lame",
|
|
||||||
"-ar", "22050",
|
|
||||||
"-b:a", "32k",
|
|
||||||
"-t", str(MAX_RECORDING_SECONDS),
|
|
||||||
str(self._current_file),
|
|
||||||
]
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._process = await asyncio.create_subprocess_exec(
|
|
||||||
*cmd,
|
|
||||||
stdout=asyncio.subprocess.PIPE,
|
|
||||||
stderr=asyncio.subprocess.PIPE,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Brief check for an immediate crash (e.g. ffmpeg binary missing)
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
if self._process.returncode is not None:
|
|
||||||
_, stderr = await self._process.communicate()
|
|
||||||
logger.error(f"FFmpeg exited immediately ({self._process.returncode}): {stderr.decode()}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
logger.info(f"Recording process started: {self._current_file.name}")
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"FFmpeg start failed: {e}")
|
|
||||||
self._process = None
|
|
||||||
self._current_file = None
|
|
||||||
self._current_call_id = None
|
|
||||||
return False
|
return False
|
||||||
|
self._call_id = call_id
|
||||||
|
self._call_start_mono = time.monotonic()
|
||||||
|
logger.info(f"Recording started (ring-buffer): {call_id}")
|
||||||
|
return True
|
||||||
|
|
||||||
async def stop_recording(self) -> Optional[Path]:
|
async def stop_recording(self) -> Optional[Path]:
|
||||||
if not self._process:
|
if not self._call_id:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
proc = self._process
|
call_id = self._call_id
|
||||||
output_file = self._current_file
|
call_start = self._call_start_mono
|
||||||
self._process = None
|
self._call_id = None
|
||||||
self._current_file = None
|
self._call_start_mono = None
|
||||||
self._current_call_id = None
|
|
||||||
|
|
||||||
try:
|
# Slice: everything from (call_start - pre_buffer) to now
|
||||||
proc.terminate()
|
cutoff = (call_start - PRE_BUFFER_SECONDS) if call_start else 0.0
|
||||||
await asyncio.wait_for(proc.wait(), timeout=5)
|
chunks = [chunk for ts, chunk in self._buffer if ts >= cutoff]
|
||||||
except asyncio.TimeoutError:
|
|
||||||
proc.kill()
|
|
||||||
except ProcessLookupError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if output_file and output_file.exists() and output_file.stat().st_size > 0:
|
# Safety cap: if the call ran very long, truncate to MAX_RECORDING_SECONDS
|
||||||
logger.info(f"Recording saved: {output_file.name} ({output_file.stat().st_size} bytes)")
|
if call_start is not None:
|
||||||
return output_file
|
cap_cutoff = call_start + MAX_RECORDING_SECONDS
|
||||||
|
now = time.monotonic()
|
||||||
|
if now > cap_cutoff:
|
||||||
|
# Approximate: trim chunks that arrived after the cap
|
||||||
|
cap_keep_until = call_start + MAX_RECORDING_SECONDS
|
||||||
|
chunks = [
|
||||||
|
chunk for ts, chunk in self._buffer
|
||||||
|
if cutoff <= ts <= cap_keep_until
|
||||||
|
]
|
||||||
|
|
||||||
logger.warning("Recording file empty or missing — discarding.")
|
if not chunks:
|
||||||
|
logger.warning(
|
||||||
|
f"No buffered audio for call {call_id} — "
|
||||||
|
"stream may not have been connected yet."
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
self._recordings_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
ts_str = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
||||||
|
output_path = self._recordings_dir / f"{ts_str}_{call_id}.mp3"
|
||||||
|
|
||||||
|
data = b"".join(chunks)
|
||||||
|
output_path.write_bytes(data)
|
||||||
|
|
||||||
|
size = output_path.stat().st_size
|
||||||
|
if size > 0:
|
||||||
|
logger.info(f"Recording saved: {output_path.name} ({size} bytes)")
|
||||||
|
return output_path
|
||||||
|
|
||||||
|
output_path.unlink(missing_ok=True)
|
||||||
|
logger.warning(f"Recording for call {call_id} produced an empty file.")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Upload (unchanged interface)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
async def upload_recording(
|
async def upload_recording(
|
||||||
self,
|
self,
|
||||||
file_path: Path,
|
file_path: Path,
|
||||||
@@ -102,8 +180,8 @@ class CallRecorder:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
upload_url = f"{settings.c2_url}/upload"
|
upload_url = f"{settings.c2_url}/upload"
|
||||||
api_key = credentials.get_api_key()
|
api_key = credentials.get_api_key()
|
||||||
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
|
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
|
||||||
|
|
||||||
form: dict = {"call_id": call_id, "node_id": settings.node_id}
|
form: dict = {"call_id": call_id, "node_id": settings.node_id}
|
||||||
if talkgroup_id is not None:
|
if talkgroup_id is not None:
|
||||||
@@ -137,7 +215,7 @@ class CallRecorder:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def is_recording(self) -> bool:
|
def is_recording(self) -> bool:
|
||||||
return self._process is not None
|
return self._call_id is not None
|
||||||
|
|
||||||
|
|
||||||
call_recorder = CallRecorder()
|
call_recorder = CallRecorder()
|
||||||
|
|||||||
@@ -158,6 +158,7 @@ async def lifespan(app: FastAPI):
|
|||||||
# Start services (radio_bot starts on-demand when a discord_join command arrives)
|
# Start services (radio_bot starts on-demand when a discord_join command arrives)
|
||||||
await mqtt_manager.connect()
|
await mqtt_manager.connect()
|
||||||
await metadata_watcher.start()
|
await metadata_watcher.start()
|
||||||
|
await call_recorder.start() # persistent Icecast stream buffer
|
||||||
|
|
||||||
# Report initial status and resume OP25 if node was already configured before this restart
|
# Report initial status and resume OP25 if node was already configured before this restart
|
||||||
node_cfg = load_node_config()
|
node_cfg = load_node_config()
|
||||||
@@ -181,6 +182,7 @@ async def lifespan(app: FastAPI):
|
|||||||
logger.info("Edge node shutting down.")
|
logger.info("Edge node shutting down.")
|
||||||
heartbeat_task.cancel()
|
heartbeat_task.cancel()
|
||||||
await metadata_watcher.stop()
|
await metadata_watcher.stop()
|
||||||
|
await call_recorder.stop()
|
||||||
await radio_bot.stop()
|
await radio_bot.stop()
|
||||||
await mqtt_manager.disconnect()
|
await mqtt_manager.disconnect()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user