17 Commits

Author SHA1 Message Date
0a58624e50 Merge pull request 'Implement Call Recording for STT and Replay' (#3) from implement-call-recording into main
All checks were successful
release-tag / release-image (push) Successful in 1h26m13s
Reviewed-on: #3
2026-01-03 19:38:04 -05:00
Logan Cusano
10554a2ff4 Properly add ffmpeg to the dockerfile install sequence 2026-01-03 19:32:20 -05:00
Logan Cusano
051eac88b0 Add call ID to the call metadata 2026-01-03 19:18:30 -05:00
Logan Cusano
d8190e307c Standardize timestamps to UTC 2026-01-03 11:41:27 -05:00
Logan Cusano
83b995bfa5 Fix bootleg AI mistake 2026-01-03 03:12:57 -05:00
Logan Cusano
9e92da4e58 Replace http server vars with dedicated vars 2026-01-03 03:10:45 -05:00
Logan Cusano
0fe8194c39 fix upload url 2026-01-02 00:17:36 -05:00
Logan Cusano
8c106473cf Move bucket upload to the c2 server and replaced with upload to c2 server 2025-12-30 03:01:31 -05:00
Logan Cusano
a5d5fa9de7 Install ffmpeg to test if that resolves issue with recording 2025-12-29 22:55:18 -05:00
Logan Cusano
a7de6bfb04 Fix the calls directory bug 2025-12-29 22:47:18 -05:00
Logan Cusano
3b98e3a72a Add GCP to the requirements 2025-12-29 22:21:58 -05:00
Logan Cusano
41075a5950 init 2025-12-29 22:18:58 -05:00
de143a67fe Merge pull request 'Implement Metadata Watcher' (#1) from metadata-watcher into main
All checks were successful
release-tag / release-image (push) Successful in 1h26m24s
Reviewed-on: #1
2025-12-29 19:04:07 -05:00
Logan Cusano
ee9ce0e140 Add the radio ID to the metadata payload to track who is talking, not just what system 2025-12-29 19:02:51 -05:00
Logan Cusano
ca984be293 Implement debug logging into metadata watcher 2025-12-29 15:48:45 -05:00
Logan Cusano
b8ee991192 Update port in docker compose and update metadata watcher function to use correct OP@5 endpoint 2025-12-29 15:23:18 -05:00
Logan Cusano
0a6b565651 Fix bug in op25 config where it would not create liquidsoap if saved config was loaded 2025-12-29 15:06:48 -05:00
6 changed files with 208 additions and 41 deletions

View File

@@ -1,5 +1,9 @@
NODE_ID= NODE_ID=
MQTT_BROKER= MQTT_BROKER=
ICECAST_SERVER= ICECAST_SERVER=
AUDIO_BUCKET=
NODE_LAT= NODE_LAT=
NODE_LONG= NODE_LONG=
HTTP_SERVER_PROTOCOL=
HTTP_SERVER_ADDRESS=
HTTP_SERVER_PORT=

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@
*.db *.db
*.conf *.conf
configs/* configs/*
*.json

View File

@@ -7,7 +7,7 @@ ENV DEBIAN_FRONTEND=noninteractive
# Install system dependencies # Install system dependencies
RUN apt-get update && \ RUN apt-get update && \
apt-get upgrade -y && \ apt-get upgrade -y && \
apt-get install git pulseaudio pulseaudio-utils liquidsoap -y apt-get install git pulseaudio pulseaudio-utils liquidsoap ffmpeg -y
# Clone the boatbod op25 repository # Clone the boatbod op25 repository
RUN git clone -b gr310 https://github.com/boatbod/op25 /op25 RUN git clone -b gr310 https://github.com/boatbod/op25 /op25
@@ -34,6 +34,9 @@ EXPOSE 8001 8081
# Create and set up the configuration directory # Create and set up the configuration directory
VOLUME ["/configs"] VOLUME ["/configs"]
# Create the calls local cache directory
VOLUME ["/calls"]
# Set the working directory in the container # Set the working directory in the container
WORKDIR /app WORKDIR /app

View File

@@ -3,9 +3,10 @@ import json
import os import os
import shutil import shutil
from pathlib import Path from pathlib import Path
from models.models import TalkgroupTag from models.models import TalkgroupTag, IcecastConfig
from typing import List, Dict from typing import List, Dict
from internal.logger import create_logger from internal.logger import create_logger
from internal.liquidsoap_config_utils import generate_liquid_script
LOGGER = create_logger(__name__) LOGGER = create_logger(__name__)
@@ -66,6 +67,27 @@ def activate_config_from_library(system_name: str) -> bool:
if src_whitelist.exists(): if src_whitelist.exists():
shutil.copy2(src_whitelist, config_path / "active.cfg.whitelist.tsv") shutil.copy2(src_whitelist, config_path / "active.cfg.whitelist.tsv")
# Generate Liquidsoap Script by reading the activated config
with open(dst, 'r') as f:
data = json.load(f)
if "trunking" in data and "metadata" in data:
streams = data.get("metadata", {}).get("streams", [])
if streams:
stream = streams[0]
address = stream.get("icecastServerAddress", "127.0.0.1:8000")
host, port = address.split(":") if ":" in address else (address, 8000)
ice_config = IcecastConfig(
icecast_host=host,
icecast_port=int(port),
icecast_mountpoint=stream.get("icecastMountpoint", "/stream"),
icecast_password=stream.get("icecastPass", "hackme"),
icecast_description="OP25 Stream",
icecast_genre="Scanner"
)
generate_liquid_script(ice_config)
return True return True
except Exception as e: except Exception as e:
LOGGER.error(f"Failed to copy config: {e}") LOGGER.error(f"Failed to copy config: {e}")

View File

@@ -20,6 +20,9 @@ app.include_router(create_op25_router(), prefix="/op25")
# Configuration # Configuration
NODE_ID = os.getenv("NODE_ID", "standalone-node") NODE_ID = os.getenv("NODE_ID", "standalone-node")
MQTT_BROKER = os.getenv("MQTT_BROKER", None) MQTT_BROKER = os.getenv("MQTT_BROKER", None)
HTTP_SERVER_PROTOCOL = os.getenv("HTTP_SERVER_PROTOCOL", "http")
HTTP_SERVER_ADDRESS = os.getenv("HTTP_SERVER_ADDRESS", "127.0.0.1")
HTTP_SERVER_PORT = os.getenv("HTTP_SERVER_PORT", 8000)
NODE_LAT = os.getenv("NODE_LAT") NODE_LAT = os.getenv("NODE_LAT")
NODE_LONG = os.getenv("NODE_LONG") NODE_LONG = os.getenv("NODE_LONG")
@@ -112,6 +115,37 @@ def handle_c2_command(topic, payload):
except Exception as e: except Exception as e:
LOGGER.error(f"Error processing C2 command: {e}") LOGGER.error(f"Error processing C2 command: {e}")
def get_current_stream_url():
"""
Dynamically resolves the audio stream URL from the active OP25 configuration.
Falls back to env var or default if config is missing/invalid.
"""
default_url = os.getenv("STREAM_URL", "http://127.0.0.1:8000/stream_0")
config_path = "/configs/active.cfg.json"
if not os.path.exists(config_path):
return default_url
try:
with open(config_path, "r") as f:
config = json.load(f)
streams = config.get("metadata", {}).get("streams", [])
if not streams:
return default_url
stream = streams[0]
address = stream.get("icecastServerAddress", "127.0.0.1:8000")
mount = stream.get("icecastMountpoint", "stream_0")
if not mount.startswith("/"):
mount = f"/{mount}"
return f"http://{address}{mount}"
except Exception as e:
LOGGER.warning(f"Failed to resolve stream URL from config: {e}")
return default_url
async def mqtt_lifecycle_manager(): async def mqtt_lifecycle_manager():
""" """
Manages the application-level logic: Check-in, Heartbeats, and Shutdown. Manages the application-level logic: Check-in, Heartbeats, and Shutdown.
@@ -153,7 +187,7 @@ async def mqtt_lifecycle_manager():
payload = { payload = {
"node_id": NODE_ID, "node_id": NODE_ID,
"status": "online", "status": "online",
"timestamp": datetime.now().isoformat(), "timestamp": datetime.utcnow().isoformat(),
"is_listening": op25_status.get("is_running", False), "is_listening": op25_status.get("is_running", False),
"active_system": op25_status.get("active_system"), "active_system": op25_status.get("active_system"),
# Only scan library if needed, otherwise it's heavy I/O # Only scan library if needed, otherwise it's heavy I/O
@@ -186,12 +220,49 @@ async def mqtt_lifecycle_manager():
async def metadata_watcher(): async def metadata_watcher():
""" """
Polls OP25 HTTP terminal for metadata and publishes events to MQTT. Polls OP25 HTTP terminal for metadata and publishes events to MQTT.
Corrected to use the POST-based command API found in the HAR capture.
""" """
last_tgid = 0 last_tgid = 0
last_metadata = {} last_metadata = {}
potential_end_time = None potential_end_time = None
DEBOUNCE_SECONDS = 2.5 DEBOUNCE_SECONDS = 2.5
OP25_DATA_URL = "http://127.0.0.1:8081/data.json" OP25_DATA_URL = "http://127.0.0.1:8081/"
# This is the specific payload the OP25 web interface uses [cite: 45562, 45563]
COMMAND_PAYLOAD = [{"command": "update", "arg1": 0, "arg2": 0}]
# Audio Recording State
recorder_proc = None
current_call_id = None
async def stop_recording():
nonlocal recorder_proc
if recorder_proc:
if recorder_proc.returncode is None:
recorder_proc.terminate()
try:
await asyncio.wait_for(recorder_proc.wait(), timeout=2.0)
except asyncio.TimeoutError:
recorder_proc.kill()
recorder_proc = None
def upload_audio(call_id):
if not MQTT_BROKER: return None
local_path = f"/calls/{call_id}.mp3"
if not os.path.exists(local_path): return None
try:
with open(local_path, "rb") as f:
files = {"file": (f"{call_id}.mp3", f, "audio/mpeg")}
response = requests.post(f"{HTTP_SERVER_PROTOCOL}://{HTTP_SERVER_ADDRESS}:{HTTP_SERVER_PORT}/upload", files=files, data={"node_id": NODE_ID, "call_id": call_id}, timeout=30)
response.raise_for_status()
return response.json().get("url")
except Exception as e:
LOGGER.error(f"Upload failed: {e}")
return None
finally:
if os.path.exists(local_path):
os.remove(local_path)
while True: while True:
if not MQTT_CONNECTED: if not MQTT_CONNECTED:
@@ -199,69 +270,130 @@ async def mqtt_lifecycle_manager():
continue continue
try: try:
# Run blocking request in executor to avoid blocking the asyncio loop # Run blocking POST request in executor
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
response = await loop.run_in_executor(None, lambda: requests.get(OP25_DATA_URL, timeout=0.5)) response = await loop.run_in_executor(
None,
lambda: requests.post(OP25_DATA_URL, json=COMMAND_PAYLOAD, timeout=0.5)
)
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
# LOGGER.debug(f"Response from OP25 API: {data}")
current_tgid = 0 current_tgid = 0
current_meta = {} current_meta = {}
# Handle multi_rx list or single dict structure # The response is an array of update objects
if isinstance(data, list): for item in data:
for ch in data: if item.get("json_type") == "channel_update":
t = ch.get("tgid", 0) # The terminal provides channel info keyed by channel index (e.g., "0")
if t and int(t) > 0: # We look for the first channel that has an active TGID
current_tgid = int(t) for key in item:
current_meta = { if key.isdigit():
"tgid": str(t), ch = item[key]
"alpha_tag": str(ch.get("tag", "")).strip(), t = ch.get("tgid")
"frequency": str(ch.get("freq", 0)),
"sysname": str(ch.get("system", "")).strip()
}
break
elif isinstance(data, dict):
t = data.get("tgid", 0)
if t and int(t) > 0:
current_tgid = int(t)
current_meta = {
"tgid": str(t),
"alpha_tag": str(data.get("tag", "")).strip(),
"frequency": str(data.get("freq", 0)),
"sysname": str(data.get("system", "")).strip()
}
now = datetime.now() # OP25 returns null or 0 when no talkgroup is active
if t and int(t) > 0:
current_tgid = int(t)
current_meta = {
"tgid": str(t),
"rid": str(ch.get("srcaddr", "")).strip(),
"alpha_tag": str(ch.get("tag", "")).strip(),
"frequency": str(ch.get("freq", 0)),
"sysname": str(ch.get("system", "")).strip()
}
break
if current_tgid: break
now = datetime.utcnow()
# Logic for handling call start/end events
if current_tgid != 0: if current_tgid != 0:
potential_end_time = None # Reset debounce potential_end_time = None
if current_tgid != last_tgid: if current_tgid != last_tgid:
if last_tgid != 0: if last_tgid != 0:
# End previous call immediately if switching channels # --- END PREVIOUS CALL ---
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata} await stop_recording()
audio_url = None
if current_call_id:
audio_url = await loop.run_in_executor(None, upload_audio, current_call_id)
LOGGER.debug(f"Switching TGID: {last_tgid} -> {current_tgid}")
payload = {
"node_id": NODE_ID,
"timestamp": now.isoformat(),
"event": "call_end",
"metadata": last_metadata,
"audio_url": audio_url,
"call_id": current_call_id
}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
# Start new call # --- START NEW CALL ---
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_start", "metadata": current_meta} LOGGER.debug(f"Call Start: TGID {current_tgid} ({current_meta.get('alpha_tag')})")
# Generate ID
start_ts = int(now.timestamp())
sysname = current_meta.get('sysname', 'unknown')
tgid = current_meta.get('tgid', '0')
current_call_id = f"{NODE_ID}_{sysname}_{tgid}_{start_ts}"
# Start Recording (FFmpeg)
try:
stream_url = get_current_stream_url()
recorder_proc = await asyncio.create_subprocess_exec(
"ffmpeg", "-i", stream_url, "-y", "-t", "300",
f"/calls/{current_call_id}.mp3",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
except Exception as e:
LOGGER.error(f"Failed to start recorder: {e}")
payload = {
"node_id": NODE_ID,
"timestamp": now.isoformat(),
"event": "call_start",
"metadata": current_meta,
"call_id": current_call_id
}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
last_tgid = current_tgid last_tgid = current_tgid
last_metadata = current_meta last_metadata = current_meta
elif last_tgid != 0: elif last_tgid != 0:
if potential_end_time is None: if potential_end_time is None:
LOGGER.debug(f"Signal lost for TGID {last_tgid}. Starting debounce.")
potential_end_time = now potential_end_time = now
elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS: elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS:
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata} # --- END CALL (Debounce Expired) ---
await stop_recording()
audio_url = None
if current_call_id:
audio_url = await loop.run_in_executor(None, upload_audio, current_call_id)
LOGGER.debug(f"Call End (Debounce expired): TGID {last_tgid}")
payload = {
"node_id": NODE_ID,
"timestamp": now.isoformat(),
"event": "call_end",
"metadata": last_metadata,
"audio_url": audio_url,
"call_id": current_call_id
}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
last_tgid = 0 last_tgid = 0
last_metadata = {} last_metadata = {}
potential_end_time = None potential_end_time = None
current_call_id = None
else:
LOGGER.debug(f"OP25 API returned status: {response.status_code}")
except Exception: except Exception as e:
pass # OP25 might be restarting or busy LOGGER.warning(f"Metadata watcher error: {e}")
await asyncio.sleep(0.25) await asyncio.sleep(0.25)

View File

@@ -7,6 +7,7 @@ services:
restart: unless-stopped restart: unless-stopped
ports: ports:
- 8001:8001 - 8001:8001
- 8081:8081
devices: devices:
- "/dev/bus/usb:/dev/bus/usb" - "/dev/bus/usb:/dev/bus/usb"
volumes: volumes:
@@ -20,6 +21,10 @@ services:
- NODE_LONG=${NODE_LONG} - NODE_LONG=${NODE_LONG}
- MQTT_BROKER=${MQTT_BROKER} - MQTT_BROKER=${MQTT_BROKER}
- ICECAST_SERVER=${ICECAST_SERVER} - ICECAST_SERVER=${ICECAST_SERVER}
- AUDIO_BUCKET=${AUDIO_BUCKET}
- HTTP_SERVER_PROTOCOL=${HTTP_SERVER_PROTOCOL}
- HTTP_SERVER_ADDRESS=${HTTP_SERVER_ADDRESS}
- HTTP_SERVER_PORT=${HTTP_SERVER_PORT}
networks: networks:
- radio-shared-net - radio-shared-net