27 Commits

Author SHA1 Message Date
Logan Cusano
d8fc867f98 Trying to resolve discord functions to see if audio works there but just not being relayed to liq 2026-01-04 04:00:50 -05:00
Logan Cusano
deb87d5888 Trying to retransmit UDP audio 2026-01-04 01:51:01 -05:00
Logan Cusano
b7c7158ea3 Use the global logger 2026-01-04 01:40:44 -05:00
Logan Cusano
4f5dcaf6ce Updates to discord radio module to try and fix redirection problem 2026-01-04 01:32:46 -05:00
Logan Cusano
00f4ebea2d Only try to update presence and transmit when the bot is online 2026-01-04 01:18:11 -05:00
Logan Cusano
19ac4b48be Implment first draft for discord module 2026-01-04 00:12:58 -05:00
Logan Cusano
2ec684f066 Fix bug with active system name 2026-01-03 23:54:02 -05:00
0a58624e50 Merge pull request 'Implement Call Recording for STT and Replay' (#3) from implement-call-recording into main
All checks were successful
release-tag / release-image (push) Successful in 1h26m13s
Reviewed-on: #3
2026-01-03 19:38:04 -05:00
Logan Cusano
10554a2ff4 Properly add ffmpeg to the dockerfile install sequence 2026-01-03 19:32:20 -05:00
Logan Cusano
051eac88b0 Add call ID to the call metadata 2026-01-03 19:18:30 -05:00
Logan Cusano
d8190e307c Standardize timestamps to UTC 2026-01-03 11:41:27 -05:00
Logan Cusano
83b995bfa5 Fix bootleg AI mistake 2026-01-03 03:12:57 -05:00
Logan Cusano
9e92da4e58 Replace http server vars with dedicated vars 2026-01-03 03:10:45 -05:00
Logan Cusano
0fe8194c39 fix upload url 2026-01-02 00:17:36 -05:00
Logan Cusano
8c106473cf Move bucket upload to the c2 server and replaced with upload to c2 server 2025-12-30 03:01:31 -05:00
Logan Cusano
a5d5fa9de7 Install ffmpeg to test if that resolves issue with recording 2025-12-29 22:55:18 -05:00
Logan Cusano
a7de6bfb04 Fix the calls directory bug 2025-12-29 22:47:18 -05:00
Logan Cusano
3b98e3a72a Add GCP to the requirements 2025-12-29 22:21:58 -05:00
Logan Cusano
41075a5950 init 2025-12-29 22:18:58 -05:00
de143a67fe Merge pull request 'Implement Metadata Watcher' (#1) from metadata-watcher into main
All checks were successful
release-tag / release-image (push) Successful in 1h26m24s
Reviewed-on: #1
2025-12-29 19:04:07 -05:00
Logan Cusano
ee9ce0e140 Add the radio ID to the metadata payload to track who is talking, not just what system 2025-12-29 19:02:51 -05:00
Logan Cusano
ca984be293 Implement debug logging into metadata watcher 2025-12-29 15:48:45 -05:00
Logan Cusano
b8ee991192 Update port in docker compose and update metadata watcher function to use correct OP@5 endpoint 2025-12-29 15:23:18 -05:00
Logan Cusano
0a6b565651 Fix bug in op25 config where it would not create liquidsoap if saved config was loaded 2025-12-29 15:06:48 -05:00
Logan Cusano
269ce033eb Updated op25 config functions 2025-12-29 14:09:53 -05:00
Logan Cusano
c481db6702 Update gitignore for configs 2025-12-29 14:09:26 -05:00
Logan Cusano
e740b46bfe Add example env file 2025-12-29 13:52:23 -05:00
9 changed files with 612 additions and 93 deletions

9
.env.example Normal file
View File

@@ -0,0 +1,9 @@
NODE_ID=
MQTT_BROKER=
ICECAST_SERVER=
AUDIO_BUCKET=
NODE_LAT=
NODE_LONG=
HTTP_SERVER_PROTOCOL=
HTTP_SERVER_ADDRESS=
HTTP_SERVER_PORT=

3
.gitignore vendored
View File

@@ -2,4 +2,5 @@
*.log *.log
*.db *.db
*.conf *.conf
config/* configs/*
*.json

View File

@@ -7,7 +7,7 @@ ENV DEBIAN_FRONTEND=noninteractive
# Install system dependencies # Install system dependencies
RUN apt-get update && \ RUN apt-get update && \
apt-get upgrade -y && \ apt-get upgrade -y && \
apt-get install git pulseaudio pulseaudio-utils liquidsoap -y apt-get install git pulseaudio pulseaudio-utils liquidsoap ffmpeg -y
# Clone the boatbod op25 repository # Clone the boatbod op25 repository
RUN git clone -b gr310 https://github.com/boatbod/op25 /op25 RUN git clone -b gr310 https://github.com/boatbod/op25 /op25
@@ -34,6 +34,9 @@ EXPOSE 8001 8081
# Create and set up the configuration directory # Create and set up the configuration directory
VOLUME ["/configs"] VOLUME ["/configs"]
# Create the calls local cache directory
VOLUME ["/calls"]
# Set the working directory in the container # Set the working directory in the container
WORKDIR /app WORKDIR /app

View File

@@ -0,0 +1,230 @@
import discord
import asyncio
import socket
import threading
import subprocess
import shlex
import time
from internal.logger import create_logger
from collections import deque
from typing import Optional, List
LOGGER = create_logger(__name__)
class UDPAudioSource(discord.AudioSource):
"""
A custom Discord AudioSource that reads raw PCM audio from a thread-safe buffer.
It pipes the raw 8000Hz 16-bit audio into FFmpeg to convert it to
Discord's required 48000Hz stereo Opus format.
"""
def __init__(self, buffer: deque):
self.buffer = buffer
# We start an FFmpeg process that reads from stdin (pipe) and outputs raw PCM (s16le) at 48k
self.ffmpeg_process = subprocess.Popen(
shlex.split(
"ffmpeg -f s16le -ar 8000 -ac 1 -i pipe:0 -f s16le -ar 48000 -ac 2 pipe:1 -loglevel panic"
),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
self._buffer_lock = threading.Lock()
def read(self) -> bytes:
"""
Reads 20ms of audio from the buffer, feeds it to ffmpeg,
and returns the converted bytes to Discord.
"""
# Discord requests 20ms of audio.
# At 8000Hz 16-bit mono, 1 second = 16000 bytes. 20ms = 320 bytes.
chunk_size = 320
data = b''
# Pull data from the UDP thread buffer
with self._buffer_lock:
if len(self.buffer) >= chunk_size:
# Optimized deque slicing
data = bytes([self.buffer.popleft() for _ in range(chunk_size)])
else:
# If we don't have enough data (buffer underrun), send silence to keep pipe open
data = b'\x00' * chunk_size
try:
# Write raw 8k mono to ffmpeg stdin
self.ffmpeg_process.stdin.write(data)
self.ffmpeg_process.stdin.flush()
# Read converted 48k stereo from ffmpeg stdout
# 48000 Hz * 2 channels * 2 bytes (16bit) * 0.02s = 3840 bytes
converted_data = self.ffmpeg_process.stdout.read(3840)
return converted_data
except Exception as e:
LOGGER.error(f"Error in FFmpeg conversion: {e}")
return b'\x00' * 3840
def cleanup(self):
if self.ffmpeg_process:
try:
self.ffmpeg_process.stdin.close()
self.ffmpeg_process.terminate()
except:
pass
class DiscordRadioBot(discord.Client):
"""
A dedicated Discord Client for streaming Radio traffic.
Features:
- Independent UDP Listener (non-blocking)
- Packet Forwarding (to keep Liquidsoap/Icecast working)
- Manual Channel Move detection
- Dynamic Token/Channel joining
- Debug Stats logging
"""
def __init__(self, listen_port=23456, forward_ports: List[int] = None):
intents = discord.Intents.default()
super().__init__(intents=intents)
self.listen_port = listen_port
self.forward_ports = forward_ports or []
self.audio_buffer = deque(maxlen=160000) # Buffer ~10 seconds of audio
self.udp_sock = None
self.running = False
self.voice_client: Optional[discord.VoiceClient] = None
# Debug Stats
self.packets_received = 0
self.packets_forwarded = 0
self.last_log_time = time.time()
# Start the UDP Listener in a separate daemon thread immediately
self.udp_thread = threading.Thread(target=self._udp_listener_loop, daemon=True)
self.udp_thread.start()
def _udp_listener_loop(self):
"""
Runs in a background thread. Listens for UDP packets from OP25.
"""
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Bind to all interfaces to ensure we catch the OP25 stream
self.udp_sock.bind(('0.0.0.0', self.listen_port))
self.udp_sock.settimeout(1.0)
LOGGER.info(f"UDP Audio Bridge listening on 0.0.0.0:{self.listen_port}")
LOGGER.info(f"Forwarding audio to: {self.forward_ports}")
# Create sockets for each forward port
forward_targets = []
for port in self.forward_ports:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
forward_targets.append((s, port))
self.running = True
while self.running:
try:
data, addr = self.udp_sock.recvfrom(4096)
self.packets_received += 1
# 1. Forward to Liquidsoap/audio.py
for sock, port in forward_targets:
try:
sock.sendto(data, ('127.0.0.1', port))
self.packets_forwarded += 1
except Exception as fe:
# Log forwarding errors only once per 5 seconds to avoid spam
if self.packets_forwarded % 100 == 0:
LOGGER.error(f"Forward error to port {port}: {fe}")
# 2. Add to Discord Buffer
self.audio_buffer.extend(data)
# Periodic Stats Logging
if time.time() - self.last_log_time > 10:
if self.packets_received > 0:
LOGGER.info(f"Discord Audio Bridge Stats: Received {self.packets_received} packets, Forwarded {self.packets_forwarded}")
self.packets_received = 0
self.packets_forwarded = 0
self.last_log_time = time.time()
except socket.timeout:
continue
except Exception as e:
LOGGER.error(f"UDP Listener Error: {e}")
time.sleep(0.1)
async def on_ready(self):
LOGGER.info(f'Discord Radio Bot connected as {self.user}')
async def on_voice_state_update(self, member, before, after):
"""
Handles manual moves.
"""
if member.id == self.user.id:
if after.channel is not None and before.channel != after.channel:
LOGGER.info(f"Bot was moved to channel: {after.channel.name}")
self.voice_client = member.guild.voice_client
async def start_session(self, token: str, channel_id: int):
"""
Connects the bot to Discord and joins the specified channel.
"""
asyncio.create_task(self.start(token))
# Wait for login to complete
for _ in range(20):
if self.is_ready():
break
await asyncio.sleep(0.5)
try:
channel = self.get_channel(int(channel_id))
if not channel:
channel = await self.fetch_channel(int(channel_id))
if channel:
self.voice_client = await channel.connect()
LOGGER.info(f"Joined voice channel: {channel.name}")
else:
LOGGER.error(f"Could not find channel ID: {channel_id}")
except Exception as e:
LOGGER.error(f"Failed to join voice: {e}")
async def stop_session(self):
"""
Disconnects voice and closes the bot connection.
"""
if self.voice_client:
await self.voice_client.disconnect()
await self.close()
def update_system_presence(self, system_name: str):
"""
Updates the "Playing..." status of the bot.
"""
activity = discord.Activity(
type=discord.ActivityType.listening,
name=f"{system_name}"
)
asyncio.create_task(self.change_presence(activity=activity))
def start_transmission(self):
"""
Called when a radio call STARTS.
Connects the audio buffer to the voice stream.
This turns the green ring ON.
"""
if self.voice_client and not self.voice_client.is_playing():
# Clear old audio so we don't start with a delay
self.audio_buffer.clear()
source = UDPAudioSource(self.audio_buffer)
self.voice_client.play(source)
def stop_transmission(self):
"""
Called when a radio call ENDS.
Stops the stream.
This turns the green ring OFF.
"""
if self.voice_client and self.voice_client.is_playing():
self.voice_client.stop()

View File

@@ -2,9 +2,11 @@ import csv
import json import json
import os import os
import shutil import shutil
from models.models import TalkgroupTag from pathlib import Path
from models.models import TalkgroupTag, IcecastConfig
from typing import List, Dict from typing import List, Dict
from internal.logger import create_logger from internal.logger import create_logger
from internal.liquidsoap_config_utils import generate_liquid_script
LOGGER = create_logger(__name__) LOGGER = create_logger(__name__)
@@ -28,8 +30,8 @@ def scan_local_library() -> List[Dict]:
# Use trunking sysname or filename as the identifier # Use trunking sysname or filename as the identifier
sys_name = data.get("trunking", {}).get("sysname", filename.replace(".json", "")) sys_name = data.get("trunking", {}).get("sysname", filename.replace(".json", ""))
library.append({ library.append({
"name": sys_name, "system_name": sys_name,
"system_name": filename, "filename": filename,
"mode": "P25" if "trunking" in data else "NBFM" "mode": "P25" if "trunking" in data else "NBFM"
}) })
except Exception as e: except Exception as e:
@@ -44,16 +46,48 @@ def activate_config_from_library(system_name: str) -> bool:
if not system_name.endswith(".json"): if not system_name.endswith(".json"):
system_name += ".json" system_name += ".json"
src = os.path.join(CONFIG_DIR, system_name) config_path = Path(CONFIG_DIR)
dst = os.path.join(CONFIG_DIR, "active.cfg.json") src = config_path / system_name
dst = config_path / "active.cfg.json"
if not os.path.exists(src): if not src.exists():
LOGGER.error(f"Source config {system_name} not found in library.") LOGGER.error(f"Source config {system_name} not found in library.")
return False return False
try: try:
shutil.copy2(src, dst) shutil.copy2(src, dst)
LOGGER.info(f"Activated config: {system_name}") LOGGER.info(f"Activated config: {system_name}")
# Copy sidecar files (tags/whitelist) if they exist
src_tags = src.with_suffix(".tags.tsv")
if src_tags.exists():
shutil.copy2(src_tags, config_path / "active.cfg.tags.tsv")
src_whitelist = src.with_suffix(".whitelist.tsv")
if src_whitelist.exists():
shutil.copy2(src_whitelist, config_path / "active.cfg.whitelist.tsv")
# Generate Liquidsoap Script by reading the activated config
with open(dst, 'r') as f:
data = json.load(f)
if "trunking" in data and "metadata" in data:
streams = data.get("metadata", {}).get("streams", [])
if streams:
stream = streams[0]
address = stream.get("icecastServerAddress", "127.0.0.1:8000")
host, port = address.split(":") if ":" in address else (address, 8000)
ice_config = IcecastConfig(
icecast_host=host,
icecast_port=int(port),
icecast_mountpoint=stream.get("icecastMountpoint", "/stream"),
icecast_password=stream.get("icecastPass", "hackme"),
icecast_description="OP25 Stream",
icecast_genre="Scanner"
)
generate_liquid_script(ice_config)
return True return True
except Exception as e: except Exception as e:
LOGGER.error(f"Failed to copy config: {e}") LOGGER.error(f"Failed to copy config: {e}")
@@ -88,14 +122,16 @@ def get_current_active_config() -> Dict:
return {} return {}
return {} return {}
def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag], prefix: str = "active.cfg") -> None:
with open(os.path.join(CONFIG_DIR, "active.cfg.tags.tsv"), 'w', newline='', encoding='utf-8') as file: filename = f"{prefix}.tags.tsv"
with open(os.path.join(CONFIG_DIR, filename), 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n') writer = csv.writer(file, delimiter='\t', lineterminator='\n')
for tag in talkgroup_tags: for tag in talkgroup_tags:
writer.writerow([tag.tagDec, tag.talkgroup]) writer.writerow([tag.tagDec, tag.talkgroup])
def save_whitelist(talkgroup_tags: List[int]) -> None: def save_whitelist(talkgroup_tags: List[int], prefix: str = "active.cfg") -> None:
with open(os.path.join(CONFIG_DIR, "active.cfg.whitelist.tsv"), 'w', newline='', encoding='utf-8') as file: filename = f"{prefix}.whitelist.tsv"
with open(os.path.join(CONFIG_DIR, filename), 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n') writer = csv.writer(file, delimiter='\t', lineterminator='\n')
for tag in talkgroup_tags: for tag in talkgroup_tags:
writer.writerow([tag]) writer.writerow([tag])
@@ -117,6 +153,6 @@ def get_current_system_from_config() -> str:
if not data: if not data:
return None return None
try: try:
return data.get("trunking", {}).get("sysname", "Unknown System") return data.get("trunking", {}).get("chans", [{}])[0].get("sysname", "Unknown System")
except: except:
return "Unknown System" return "Unknown System"

View File

@@ -38,7 +38,7 @@ class ChannelConfig(BaseModel):
cqpsk_tracking: Optional[bool] = None cqpsk_tracking: Optional[bool] = None
frequency: Optional[float] = None frequency: Optional[float] = None
nbfmSquelch: Optional[float] = None nbfmSquelch: Optional[float] = None
destination: Optional[str] = "udp://127.0.0.1:23456" destination: Optional[str] = "udp://127.0.0.1:23457"
tracking_threshold: Optional[int] = 120 tracking_threshold: Optional[int] = 120
tracking_feedback: Optional[float] = 0.75 tracking_feedback: Optional[float] = 0.75
excess_bw: Optional[float] = 0.2 excess_bw: Optional[float] = 0.2

View File

@@ -9,6 +9,7 @@ from internal.logger import create_logger
from internal.op25_config_utls import scan_local_library from internal.op25_config_utls import scan_local_library
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import requests import requests
from internal.discord_radio import DiscordRadioBot
# Initialize logging # Initialize logging
LOGGER = create_logger(__name__) LOGGER = create_logger(__name__)
@@ -20,12 +21,21 @@ app.include_router(create_op25_router(), prefix="/op25")
# Configuration # Configuration
NODE_ID = os.getenv("NODE_ID", "standalone-node") NODE_ID = os.getenv("NODE_ID", "standalone-node")
MQTT_BROKER = os.getenv("MQTT_BROKER", None) MQTT_BROKER = os.getenv("MQTT_BROKER", None)
HTTP_SERVER_PROTOCOL = os.getenv("HTTP_SERVER_PROTOCOL", "http")
HTTP_SERVER_ADDRESS = os.getenv("HTTP_SERVER_ADDRESS", "127.0.0.1")
HTTP_SERVER_PORT = os.getenv("HTTP_SERVER_PORT", 8000)
NODE_LAT = os.getenv("NODE_LAT") NODE_LAT = os.getenv("NODE_LAT")
NODE_LONG = os.getenv("NODE_LONG") NODE_LONG = os.getenv("NODE_LONG")
# Global flag to track MQTT connection state # Global flag to track MQTT connection state
MQTT_CONNECTED = False MQTT_CONNECTED = False
# Global variable to hold the main event loop
main_loop = None
# Initialize the Discord Bot
discord_bot = DiscordRadioBot(listen_port=23457, forward_ports=[23456])
def handle_c2_command(topic, payload): def handle_c2_command(topic, payload):
""" """
Parses and routes commands received from the C2 server by calling the Parses and routes commands received from the C2 server by calling the
@@ -102,6 +112,23 @@ def handle_c2_command(topic, payload):
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
LOGGER.error(f"Failed to connect to OP25 terminal for tuning: {e}") LOGGER.error(f"Failed to connect to OP25 terminal for tuning: {e}")
elif command_type == "discord_join":
token = data.get("token")
channel_id = data.get("channel_id")
if token and channel_id:
if main_loop and main_loop.is_running():
asyncio.run_coroutine_threadsafe(discord_bot.start_session(token, channel_id), main_loop)
else:
LOGGER.error("Main event loop not available to start Discord session.")
LOGGER.info("Initiating Discord Session...")
elif command_type == "discord_leave":
if main_loop and main_loop.is_running():
asyncio.run_coroutine_threadsafe(discord_bot.stop_session(), main_loop)
else:
LOGGER.error("Main event loop not available to stop Discord session.")
LOGGER.info("Ending Discord Session...")
else: else:
LOGGER.warning(f"Unknown command type received: {command_type}") LOGGER.warning(f"Unknown command type received: {command_type}")
@@ -112,6 +139,37 @@ def handle_c2_command(topic, payload):
except Exception as e: except Exception as e:
LOGGER.error(f"Error processing C2 command: {e}") LOGGER.error(f"Error processing C2 command: {e}")
def get_current_stream_url():
"""
Dynamically resolves the audio stream URL from the active OP25 configuration.
Falls back to env var or default if config is missing/invalid.
"""
default_url = os.getenv("STREAM_URL", "http://127.0.0.1:8000/stream_0")
config_path = "/configs/active.cfg.json"
if not os.path.exists(config_path):
return default_url
try:
with open(config_path, "r") as f:
config = json.load(f)
streams = config.get("metadata", {}).get("streams", [])
if not streams:
return default_url
stream = streams[0]
address = stream.get("icecastServerAddress", "127.0.0.1:8000")
mount = stream.get("icecastMountpoint", "stream_0")
if not mount.startswith("/"):
mount = f"/{mount}"
return f"http://{address}{mount}"
except Exception as e:
LOGGER.warning(f"Failed to resolve stream URL from config: {e}")
return default_url
async def mqtt_lifecycle_manager(): async def mqtt_lifecycle_manager():
""" """
Manages the application-level logic: Check-in, Heartbeats, and Shutdown. Manages the application-level logic: Check-in, Heartbeats, and Shutdown.
@@ -153,7 +211,7 @@ async def mqtt_lifecycle_manager():
payload = { payload = {
"node_id": NODE_ID, "node_id": NODE_ID,
"status": "online", "status": "online",
"timestamp": datetime.now().isoformat(), "timestamp": datetime.utcnow().isoformat(),
"is_listening": op25_status.get("is_running", False), "is_listening": op25_status.get("is_running", False),
"active_system": op25_status.get("active_system"), "active_system": op25_status.get("active_system"),
# Only scan library if needed, otherwise it's heavy I/O # Only scan library if needed, otherwise it's heavy I/O
@@ -186,12 +244,49 @@ async def mqtt_lifecycle_manager():
async def metadata_watcher(): async def metadata_watcher():
""" """
Polls OP25 HTTP terminal for metadata and publishes events to MQTT. Polls OP25 HTTP terminal for metadata and publishes events to MQTT.
Corrected to use the POST-based command API found in the HAR capture.
""" """
last_tgid = 0 last_tgid = 0
last_metadata = {} last_metadata = {}
potential_end_time = None potential_end_time = None
DEBOUNCE_SECONDS = 2.5 DEBOUNCE_SECONDS = 2.5
OP25_DATA_URL = "http://127.0.0.1:8081/data.json" OP25_DATA_URL = "http://127.0.0.1:8081/"
# This is the specific payload the OP25 web interface uses [cite: 45562, 45563]
COMMAND_PAYLOAD = [{"command": "update", "arg1": 0, "arg2": 0}]
# Audio Recording State
recorder_proc = None
current_call_id = None
async def stop_recording():
nonlocal recorder_proc
if recorder_proc:
if recorder_proc.returncode is None:
recorder_proc.terminate()
try:
await asyncio.wait_for(recorder_proc.wait(), timeout=2.0)
except asyncio.TimeoutError:
recorder_proc.kill()
recorder_proc = None
def upload_audio(call_id):
if not MQTT_BROKER: return None
local_path = f"/calls/{call_id}.mp3"
if not os.path.exists(local_path): return None
try:
with open(local_path, "rb") as f:
files = {"file": (f"{call_id}.mp3", f, "audio/mpeg")}
response = requests.post(f"{HTTP_SERVER_PROTOCOL}://{HTTP_SERVER_ADDRESS}:{HTTP_SERVER_PORT}/upload", files=files, data={"node_id": NODE_ID, "call_id": call_id}, timeout=30)
response.raise_for_status()
return response.json().get("url")
except Exception as e:
LOGGER.error(f"Upload failed: {e}")
return None
finally:
if os.path.exists(local_path):
os.remove(local_path)
while True: while True:
if not MQTT_CONNECTED: if not MQTT_CONNECTED:
@@ -199,69 +294,145 @@ async def mqtt_lifecycle_manager():
continue continue
try: try:
# Run blocking request in executor to avoid blocking the asyncio loop # Run blocking POST request in executor
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
response = await loop.run_in_executor(None, lambda: requests.get(OP25_DATA_URL, timeout=0.5)) response = await loop.run_in_executor(
None,
lambda: requests.post(OP25_DATA_URL, json=COMMAND_PAYLOAD, timeout=0.5)
)
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
# LOGGER.debug(f"Response from OP25 API: {data}")
current_tgid = 0 current_tgid = 0
current_meta = {} current_meta = {}
# Handle multi_rx list or single dict structure # The response is an array of update objects
if isinstance(data, list): for item in data:
for ch in data: if item.get("json_type") == "channel_update":
t = ch.get("tgid", 0) # The terminal provides channel info keyed by channel index (e.g., "0")
# We look for the first channel that has an active TGID
for key in item:
if key.isdigit():
ch = item[key]
t = ch.get("tgid")
# OP25 returns null or 0 when no talkgroup is active
if t and int(t) > 0: if t and int(t) > 0:
current_tgid = int(t) current_tgid = int(t)
current_meta = { current_meta = {
"tgid": str(t), "tgid": str(t),
"rid": str(ch.get("srcaddr", "")).strip(),
"alpha_tag": str(ch.get("tag", "")).strip(), "alpha_tag": str(ch.get("tag", "")).strip(),
"frequency": str(ch.get("freq", 0)), "frequency": str(ch.get("freq", 0)),
"sysname": str(ch.get("system", "")).strip() "sysname": str(ch.get("system", "")).strip()
} }
break break
elif isinstance(data, dict): if current_tgid: break
t = data.get("tgid", 0)
if t and int(t) > 0:
current_tgid = int(t)
current_meta = {
"tgid": str(t),
"alpha_tag": str(data.get("tag", "")).strip(),
"frequency": str(data.get("freq", 0)),
"sysname": str(data.get("system", "")).strip()
}
now = datetime.now() now = datetime.utcnow()
# Logic for handling call start/end events
if current_tgid != 0: if current_tgid != 0:
potential_end_time = None # Reset debounce potential_end_time = None
if current_tgid != last_tgid: if current_tgid != last_tgid:
if last_tgid != 0: if last_tgid != 0:
# End previous call immediately if switching channels # --- END PREVIOUS CALL ---
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata} await stop_recording()
# Stop Discord Transmission
if discord_bot.is_ready():
discord_bot.stop_transmission()
audio_url = None
if current_call_id:
audio_url = await loop.run_in_executor(None, upload_audio, current_call_id)
LOGGER.debug(f"Switching TGID: {last_tgid} -> {current_tgid}")
payload = {
"node_id": NODE_ID,
"timestamp": now.isoformat(),
"event": "call_end",
"metadata": last_metadata,
"audio_url": audio_url,
"call_id": current_call_id
}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
# Start new call # --- START NEW CALL ---
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_start", "metadata": current_meta} LOGGER.debug(f"Call Start: TGID {current_tgid} ({current_meta.get('alpha_tag')})")
# Trigger Discord Transmission
if discord_bot.is_ready():
discord_bot.start_transmission()
discord_bot.update_system_presence(current_meta.get('sysname', 'Scanning'))
# Generate ID
start_ts = int(now.timestamp())
sysname = current_meta.get('sysname', 'unknown')
tgid = current_meta.get('tgid', '0')
current_call_id = f"{NODE_ID}_{sysname}_{tgid}_{start_ts}"
# Start Recording (FFmpeg)
try:
stream_url = get_current_stream_url()
recorder_proc = await asyncio.create_subprocess_exec(
"ffmpeg", "-i", stream_url, "-y", "-t", "300",
f"/calls/{current_call_id}.mp3",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
except Exception as e:
LOGGER.error(f"Failed to start recorder: {e}")
payload = {
"node_id": NODE_ID,
"timestamp": now.isoformat(),
"event": "call_start",
"metadata": current_meta,
"call_id": current_call_id
}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
last_tgid = current_tgid last_tgid = current_tgid
last_metadata = current_meta last_metadata = current_meta
elif last_tgid != 0: elif last_tgid != 0:
if potential_end_time is None: if potential_end_time is None:
LOGGER.debug(f"Signal lost for TGID {last_tgid}. Starting debounce.")
potential_end_time = now potential_end_time = now
elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS: elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS:
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata} # --- END CALL (Debounce Expired) ---
await stop_recording()
# Stop Discord Transmission
if discord_bot.is_ready():
discord_bot.stop_transmission()
audio_url = None
if current_call_id:
audio_url = await loop.run_in_executor(None, upload_audio, current_call_id)
LOGGER.debug(f"Call End (Debounce expired): TGID {last_tgid}")
payload = {
"node_id": NODE_ID,
"timestamp": now.isoformat(),
"event": "call_end",
"metadata": last_metadata,
"audio_url": audio_url,
"call_id": current_call_id
}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
last_tgid = 0 last_tgid = 0
last_metadata = {} last_metadata = {}
potential_end_time = None potential_end_time = None
current_call_id = None
else:
LOGGER.debug(f"OP25 API returned status: {response.status_code}")
except Exception: except Exception as e:
pass # OP25 might be restarting or busy LOGGER.warning(f"Metadata watcher error: {e}")
await asyncio.sleep(0.25) await asyncio.sleep(0.25)
@@ -303,6 +474,8 @@ async def mqtt_lifecycle_manager():
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
global main_loop
main_loop = asyncio.get_running_loop()
# Store the task so we can cancel it if needed (optional) # Store the task so we can cancel it if needed (optional)
app.state.mqtt_task = asyncio.create_task(mqtt_lifecycle_manager()) app.state.mqtt_task = asyncio.create_task(mqtt_lifecycle_manager())

View File

@@ -61,6 +61,77 @@ async def start_op25_logic():
return False return False
return False return False
def build_op25_config(generator: ConfigGenerator) -> dict:
if generator.type == DecodeMode.P25:
channels = [ChannelConfig(
name=generator.systemName,
trunking_sysname=generator.systemName,
enable_analog="off",
demod_type="cqpsk",
cqpsk_tracking=True,
filter_type="rc",
meta_stream_name="stream_0"
)]
devices = [DeviceConfig()]
trunking = TrunkingConfig(
module="tk_p25.py",
chans=[TrunkingChannelConfig(
sysname=generator.systemName,
control_channel_list=','.join(generator.channels),
tagsFile="/configs/active.cfg.tags.tsv",
whitelist="/configs/active.cfg.whitelist.tsv"
)]
)
metadata = MetadataConfig(
streams=[
MetadataStreamConfig(
stream_name="stream_0",
icecastServerAddress = f"{generator.icecastConfig.icecast_host}:{generator.icecastConfig.icecast_port}",
icecastMountpoint = generator.icecastConfig.icecast_mountpoint,
icecastPass = generator.icecastConfig.icecast_password
)
]
)
terminal = TerminalConfig()
return {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices],
"trunking": trunking.dict(),
"metadata": metadata.dict(),
"terminal": terminal.dict()
}
elif generator.type == DecodeMode.ANALOG:
analog_config = generator.config
channels = [ChannelConfig(
channelName=analog_config.systemName,
enableAnalog="on",
demodType="fsk4",
frequency=analog_config.frequency,
filterType="widepulse",
nbfmSquelch=analog_config.nbfmSquelch
)]
devices = [DeviceConfig(gain="LNA:32")]
return {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices]
}
else:
raise HTTPException(status_code=400, detail="Invalid decode mode")
def save_library_sidecars(system_name: str, generator: ConfigGenerator):
if generator.type == DecodeMode.P25:
prefix = system_name
if prefix.endswith(".json"):
prefix = prefix[:-5]
save_talkgroup_tags(generator.tags, prefix)
save_whitelist(generator.whitelist, prefix)
def create_op25_router(): def create_op25_router():
router = APIRouter() router = APIRouter()
@@ -93,47 +164,32 @@ def create_op25_router():
active.cfg.json, and optionally restarts the radio. active.cfg.json, and optionally restarts the radio.
""" """
try: try:
if generator.type == DecodeMode.P25: # 1. Build the configuration dictionary
# 1. Handle sidecar files (Tags/Whitelists) config_dict = build_op25_config(generator)
if generator.config.talkgroupTags:
save_talkgroup_tags(generator.config.talkgroupTags)
if generator.config.whitelist:
save_whitelist(generator.config.whitelist)
# 2. Build the main OP25 dictionary structure
config_dict = {
"channels": [c.dict() for c in generator.config.channels],
"devices": [d.dict() for d in generator.config.devices],
"trunking": generator.config.trunking.dict(),
"metadata": generator.config.metadata.dict(),
"terminal": generator.config.terminal.dict()
}
elif generator.type == DecodeMode.ANALOG:
# Simple Analog NBFM Setup for quick testing
channels = [ChannelConfig(
channelName=generator.config.systemName,
enableAnalog="on",
frequency=generator.config.frequency,
demodType="fsk4",
filterType="widepulse"
)]
config_dict = {
"channels": [c.dict() for c in channels],
"devices": [{"gain": "LNA:32"}] # Default gain for analog test
}
else:
raise HTTPException(status_code=400, detail="Invalid decode mode")
# 3. Clean 'None' values to prevent OP25 parsing errors and save
final_json = del_none_in_dict(config_dict) final_json = del_none_in_dict(config_dict)
# 2. Handle Storage and Activation
if save_to_library_name: if save_to_library_name:
# Save to library
save_config_to_library(save_to_library_name, final_json) save_config_to_library(save_to_library_name, final_json)
save_library_sidecars(save_to_library_name, generator)
# Activate from library (Copies json + sidecars)
if not activate_config_from_library(save_to_library_name):
raise HTTPException(status_code=500, detail="Failed to activate saved configuration")
else:
# Save directly to active
with open('/configs/active.cfg.json', 'w') as f: with open('/configs/active.cfg.json', 'w') as f:
json.dump(final_json, f, indent=2) json.dump(final_json, f, indent=2)
if generator.type == DecodeMode.P25:
save_talkgroup_tags(generator.tags)
save_whitelist(generator.whitelist)
# 3. Generate Liquidsoap Script (Always required for active P25 session)
if generator.type == DecodeMode.P25:
generate_liquid_script(generator.icecastConfig)
LOGGER.info("Saved new configuration to active.cfg.json") LOGGER.info("Saved new configuration to active.cfg.json")
# 4. Handle Lifecycle # 4. Handle Lifecycle
@@ -162,13 +218,19 @@ def create_op25_router():
raise HTTPException(status_code=404, detail=f"Config '{system_name}' not found in library volume") raise HTTPException(status_code=404, detail=f"Config '{system_name}' not found in library volume")
@router.post("/save_to_library") @router.post("/save_to_library")
async def save_to_library(system_name: str, config: dict): async def save_to_library(system_name: str, config: ConfigGenerator):
""" """
Directly saves a JSON configuration to the library. Directly saves a JSON configuration to the library.
""" """
if save_config_to_library(system_name, config): try:
config_dict = build_op25_config(config)
final_json = del_none_in_dict(config_dict)
if save_config_to_library(system_name, final_json):
save_library_sidecars(system_name, config)
return {"status": f"Config saved as {system_name}"} return {"status": f"Config saved as {system_name}"}
raise HTTPException(status_code=500, detail="Failed to save configuration") raise HTTPException(status_code=500, detail="Failed to save configuration")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/library") @router.get("/library")
async def get_library(): async def get_library():

View File

@@ -7,10 +7,11 @@ services:
restart: unless-stopped restart: unless-stopped
ports: ports:
- 8001:8001 - 8001:8001
- 8081:8081
devices: devices:
- "/dev/bus/usb:/dev/bus/usb" - "/dev/bus/usb:/dev/bus/usb"
volumes: volumes:
- ./config:/app/config - ./configs:/configs
- ./op25_logs:/tmp/op25 - ./op25_logs:/tmp/op25
env_file: env_file:
- .env - .env
@@ -20,6 +21,10 @@ services:
- NODE_LONG=${NODE_LONG} - NODE_LONG=${NODE_LONG}
- MQTT_BROKER=${MQTT_BROKER} - MQTT_BROKER=${MQTT_BROKER}
- ICECAST_SERVER=${ICECAST_SERVER} - ICECAST_SERVER=${ICECAST_SERVER}
- AUDIO_BUCKET=${AUDIO_BUCKET}
- HTTP_SERVER_PROTOCOL=${HTTP_SERVER_PROTOCOL}
- HTTP_SERVER_ADDRESS=${HTTP_SERVER_ADDRESS}
- HTTP_SERVER_PORT=${HTTP_SERVER_PORT}
networks: networks:
- radio-shared-net - radio-shared-net