diff --git a/app/internal/discord_radio.py b/app/internal/discord_radio.py new file mode 100644 index 0000000..75440b2 --- /dev/null +++ b/app/internal/discord_radio.py @@ -0,0 +1,230 @@ +import discord +import asyncio +import socket +import threading +import subprocess +import shlex +import time +from internal.logger import create_logger +from collections import deque +from typing import Optional, List + +LOGGER = create_logger(__name__) + +class UDPAudioSource(discord.AudioSource): + """ + A custom Discord AudioSource that reads raw PCM audio from a thread-safe buffer. + It pipes the raw 8000Hz 16-bit audio into FFmpeg to convert it to + Discord's required 48000Hz stereo Opus format. + """ + def __init__(self, buffer: deque): + self.buffer = buffer + # We start an FFmpeg process that reads from stdin (pipe) and outputs raw PCM (s16le) at 48k + self.ffmpeg_process = subprocess.Popen( + shlex.split( + "ffmpeg -f s16le -ar 8000 -ac 1 -i pipe:0 -f s16le -ar 48000 -ac 2 pipe:1 -loglevel panic" + ), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE + ) + self._buffer_lock = threading.Lock() + + def read(self) -> bytes: + """ + Reads 20ms of audio from the buffer, feeds it to ffmpeg, + and returns the converted bytes to Discord. + """ + # Discord requests 20ms of audio. + # At 8000Hz 16-bit mono, 1 second = 16000 bytes. 20ms = 320 bytes. + chunk_size = 320 + + data = b'' + + # Pull data from the UDP thread buffer + with self._buffer_lock: + if len(self.buffer) >= chunk_size: + # Optimized deque slicing + data = bytes([self.buffer.popleft() for _ in range(chunk_size)]) + else: + # If we don't have enough data (buffer underrun), send silence to keep pipe open + data = b'\x00' * chunk_size + + try: + # Write raw 8k mono to ffmpeg stdin + self.ffmpeg_process.stdin.write(data) + self.ffmpeg_process.stdin.flush() + + # Read converted 48k stereo from ffmpeg stdout + # 48000 Hz * 2 channels * 2 bytes (16bit) * 0.02s = 3840 bytes + converted_data = self.ffmpeg_process.stdout.read(3840) + return converted_data + except Exception as e: + LOGGER.error(f"Error in FFmpeg conversion: {e}") + return b'\x00' * 3840 + + def cleanup(self): + if self.ffmpeg_process: + try: + self.ffmpeg_process.stdin.close() + self.ffmpeg_process.terminate() + except: + pass + +class DiscordRadioBot(discord.Client): + """ + A dedicated Discord Client for streaming Radio traffic. + + Features: + - Independent UDP Listener (non-blocking) + - Packet Forwarding (to keep Liquidsoap/Icecast working) + - Manual Channel Move detection + - Dynamic Token/Channel joining + - Debug Stats logging + """ + def __init__(self, listen_port=23456, forward_ports: List[int] = None): + intents = discord.Intents.default() + super().__init__(intents=intents) + + self.listen_port = listen_port + self.forward_ports = forward_ports or [] + + self.audio_buffer = deque(maxlen=160000) # Buffer ~10 seconds of audio + self.udp_sock = None + self.running = False + self.voice_client: Optional[discord.VoiceClient] = None + + # Debug Stats + self.packets_received = 0 + self.packets_forwarded = 0 + self.last_log_time = time.time() + + # Start the UDP Listener in a separate daemon thread immediately + self.udp_thread = threading.Thread(target=self._udp_listener_loop, daemon=True) + self.udp_thread.start() + + def _udp_listener_loop(self): + """ + Runs in a background thread. Listens for UDP packets from OP25. + """ + self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # Bind to all interfaces to ensure we catch the OP25 stream + self.udp_sock.bind(('0.0.0.0', self.listen_port)) + self.udp_sock.settimeout(1.0) + + LOGGER.info(f"UDP Audio Bridge listening on 0.0.0.0:{self.listen_port}") + LOGGER.info(f"Forwarding audio to: {self.forward_ports}") + + # Create sockets for each forward port + forward_targets = [] + for port in self.forward_ports: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + forward_targets.append((s, port)) + + self.running = True + while self.running: + try: + data, addr = self.udp_sock.recvfrom(4096) + self.packets_received += 1 + + # 1. Forward to Liquidsoap/audio.py + for sock, port in forward_targets: + try: + sock.sendto(data, ('127.0.0.1', port)) + self.packets_forwarded += 1 + except Exception as fe: + # Log forwarding errors only once per 5 seconds to avoid spam + if self.packets_forwarded % 100 == 0: + LOGGER.error(f"Forward error to port {port}: {fe}") + + # 2. Add to Discord Buffer + self.audio_buffer.extend(data) + + # Periodic Stats Logging + if time.time() - self.last_log_time > 10: + if self.packets_received > 0: + LOGGER.info(f"Discord Audio Bridge Stats: Received {self.packets_received} packets, Forwarded {self.packets_forwarded}") + self.packets_received = 0 + self.packets_forwarded = 0 + self.last_log_time = time.time() + + except socket.timeout: + continue + except Exception as e: + LOGGER.error(f"UDP Listener Error: {e}") + time.sleep(0.1) + + async def on_ready(self): + LOGGER.info(f'Discord Radio Bot connected as {self.user}') + + async def on_voice_state_update(self, member, before, after): + """ + Handles manual moves. + """ + if member.id == self.user.id: + if after.channel is not None and before.channel != after.channel: + LOGGER.info(f"Bot was moved to channel: {after.channel.name}") + self.voice_client = member.guild.voice_client + + async def start_session(self, token: str, channel_id: int): + """ + Connects the bot to Discord and joins the specified channel. + """ + asyncio.create_task(self.start(token)) + + # Wait for login to complete + for _ in range(20): + if self.is_ready(): + break + await asyncio.sleep(0.5) + + try: + channel = self.get_channel(int(channel_id)) + if not channel: + channel = await self.fetch_channel(int(channel_id)) + + if channel: + self.voice_client = await channel.connect() + LOGGER.info(f"Joined voice channel: {channel.name}") + else: + LOGGER.error(f"Could not find channel ID: {channel_id}") + except Exception as e: + LOGGER.error(f"Failed to join voice: {e}") + + async def stop_session(self): + """ + Disconnects voice and closes the bot connection. + """ + if self.voice_client: + await self.voice_client.disconnect() + await self.close() + + def update_system_presence(self, system_name: str): + """ + Updates the "Playing..." status of the bot. + """ + activity = discord.Activity( + type=discord.ActivityType.listening, + name=f"{system_name}" + ) + asyncio.create_task(self.change_presence(activity=activity)) + + def start_transmission(self): + """ + Called when a radio call STARTS. + Connects the audio buffer to the voice stream. + This turns the green ring ON. + """ + if self.voice_client and not self.voice_client.is_playing(): + # Clear old audio so we don't start with a delay + self.audio_buffer.clear() + source = UDPAudioSource(self.audio_buffer) + self.voice_client.play(source) + + def stop_transmission(self): + """ + Called when a radio call ENDS. + Stops the stream. + This turns the green ring OFF. + """ + if self.voice_client and self.voice_client.is_playing(): + self.voice_client.stop() \ No newline at end of file diff --git a/app/internal/op25_config_utls.py b/app/internal/op25_config_utls.py index 96adfc1..8c8a287 100644 --- a/app/internal/op25_config_utls.py +++ b/app/internal/op25_config_utls.py @@ -153,6 +153,6 @@ def get_current_system_from_config() -> str: if not data: return None try: - return data.get("trunking", {}).get("sysname", "Unknown System") + return data.get("trunking", {}).get("chans", [{}])[0].get("sysname", "Unknown System") except: return "Unknown System" \ No newline at end of file diff --git a/app/models/models.py b/app/models/models.py index 1f02958..44f5da7 100644 --- a/app/models/models.py +++ b/app/models/models.py @@ -38,7 +38,7 @@ class ChannelConfig(BaseModel): cqpsk_tracking: Optional[bool] = None frequency: Optional[float] = None nbfmSquelch: Optional[float] = None - destination: Optional[str] = "udp://127.0.0.1:23456" + destination: Optional[str] = "udp://127.0.0.1:23457" tracking_threshold: Optional[int] = 120 tracking_feedback: Optional[float] = 0.75 excess_bw: Optional[float] = 0.2 diff --git a/app/node_main.py b/app/node_main.py index c38d745..111c74c 100644 --- a/app/node_main.py +++ b/app/node_main.py @@ -9,6 +9,7 @@ from internal.logger import create_logger from internal.op25_config_utls import scan_local_library import paho.mqtt.client as mqtt import requests +from internal.discord_radio import DiscordRadioBot # Initialize logging LOGGER = create_logger(__name__) @@ -29,6 +30,12 @@ NODE_LONG = os.getenv("NODE_LONG") # Global flag to track MQTT connection state MQTT_CONNECTED = False +# Global variable to hold the main event loop +main_loop = None + +# Initialize the Discord Bot +discord_bot = DiscordRadioBot(listen_port=23457, forward_ports=[23456]) + def handle_c2_command(topic, payload): """ Parses and routes commands received from the C2 server by calling the @@ -105,6 +112,23 @@ def handle_c2_command(topic, payload): except requests.exceptions.RequestException as e: LOGGER.error(f"Failed to connect to OP25 terminal for tuning: {e}") + elif command_type == "discord_join": + token = data.get("token") + channel_id = data.get("channel_id") + if token and channel_id: + if main_loop and main_loop.is_running(): + asyncio.run_coroutine_threadsafe(discord_bot.start_session(token, channel_id), main_loop) + else: + LOGGER.error("Main event loop not available to start Discord session.") + LOGGER.info("Initiating Discord Session...") + + elif command_type == "discord_leave": + if main_loop and main_loop.is_running(): + asyncio.run_coroutine_threadsafe(discord_bot.stop_session(), main_loop) + else: + LOGGER.error("Main event loop not available to stop Discord session.") + LOGGER.info("Ending Discord Session...") + else: LOGGER.warning(f"Unknown command type received: {command_type}") @@ -317,6 +341,11 @@ async def mqtt_lifecycle_manager(): if last_tgid != 0: # --- END PREVIOUS CALL --- await stop_recording() + + # Stop Discord Transmission + if discord_bot.is_ready(): + discord_bot.stop_transmission() + audio_url = None if current_call_id: audio_url = await loop.run_in_executor(None, upload_audio, current_call_id) @@ -334,6 +363,11 @@ async def mqtt_lifecycle_manager(): # --- START NEW CALL --- LOGGER.debug(f"Call Start: TGID {current_tgid} ({current_meta.get('alpha_tag')})") + + # Trigger Discord Transmission + if discord_bot.is_ready(): + discord_bot.start_transmission() + discord_bot.update_system_presence(current_meta.get('sysname', 'Scanning')) # Generate ID start_ts = int(now.timestamp()) @@ -371,6 +405,11 @@ async def mqtt_lifecycle_manager(): elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS: # --- END CALL (Debounce Expired) --- await stop_recording() + + # Stop Discord Transmission + if discord_bot.is_ready(): + discord_bot.stop_transmission() + audio_url = None if current_call_id: audio_url = await loop.run_in_executor(None, upload_audio, current_call_id) @@ -435,6 +474,8 @@ async def mqtt_lifecycle_manager(): @app.on_event("startup") async def startup_event(): + global main_loop + main_loop = asyncio.get_running_loop() # Store the task so we can cancel it if needed (optional) app.state.mqtt_task = asyncio.create_task(mqtt_lifecycle_manager())