From 19ac4b48bebccd89ede3a36e3bd2dfbf8b9be58e Mon Sep 17 00:00:00 2001 From: Logan Cusano Date: Sun, 4 Jan 2026 00:12:58 -0500 Subject: [PATCH] Implment first draft for discord module --- app/internal/discord_radio.py | 222 ++++++++++++++++++++++++++++++++++ app/models/models.py | 2 +- app/node_main.py | 27 +++++ 3 files changed, 250 insertions(+), 1 deletion(-) create mode 100644 app/internal/discord_radio.py diff --git a/app/internal/discord_radio.py b/app/internal/discord_radio.py new file mode 100644 index 0000000..d8599af --- /dev/null +++ b/app/internal/discord_radio.py @@ -0,0 +1,222 @@ +import discord +import asyncio +import socket +import logging +import threading +import struct +import subprocess +import shlex +from collections import deque +from typing import Optional, List, Tuple + +LOGGER = logging.getLogger("discord_radio") + +class UDPAudioSource(discord.AudioSource): + """ + A custom Discord AudioSource that reads raw PCM audio from a thread-safe buffer. + This buffer is filled by the UDPListener running in the background. + + It pipes the raw 8000Hz 16-bit audio into FFmpeg to convert it to + Discord's required 48000Hz stereo Opus format. + """ + def __init__(self, buffer: deque): + self.buffer = buffer + # We start an FFmpeg process that reads from stdin (pipe) and outputs raw PCM (s16le) at 48k + # discord.py then encodes this to Opus. + # Note: We use a pipe here because we are feeding it chunks from our UDP buffer. + + self.ffmpeg_process = subprocess.Popen( + shlex.split( + "ffmpeg -f s16le -ar 8000 -ac 1 -i pipe:0 -f s16le -ar 48000 -ac 2 pipe:1 -loglevel panic" + ), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE + ) + self._buffer_lock = threading.Lock() + + def read(self) -> bytes: + """ + Reads 20ms of audio from the buffer, feeds it to ffmpeg, + and returns the converted bytes to Discord. + """ + # Discord requests 20ms of audio. + # At 8000Hz 16-bit mono, 1 second = 16000 bytes. 20ms = 320 bytes. + chunk_size = 320 + + data = b'' + + # Pull data from the UDP thread buffer + with self._buffer_lock: + if len(self.buffer) >= chunk_size: + # Optimized deque slicing + data = bytes([self.buffer.popleft() for _ in range(chunk_size)]) + else: + # If we don't have enough data (buffer underrun), send silence to keep pipe open + data = b'\x00' * chunk_size + + try: + # Write raw 8k mono to ffmpeg stdin + self.ffmpeg_process.stdin.write(data) + self.ffmpeg_process.stdin.flush() + + # Read converted 48k stereo from ffmpeg stdout + # 48000 Hz * 2 channels * 2 bytes (16bit) * 0.02s = 3840 bytes + converted_data = self.ffmpeg_process.stdout.read(3840) + return converted_data + except Exception as e: + LOGGER.error(f"Error in FFmpeg conversion: {e}") + return b'\x00' * 3840 + + def cleanup(self): + if self.ffmpeg_process: + self.ffmpeg_process.kill() + +class DiscordRadioBot(discord.Client): + """ + A dedicated Discord Client for streaming Radio traffic. + + Features: + - Independent UDP Listener (non-blocking) + - Packet Forwarding (to keep Liquidsoap/Icecast working) + - Manual Channel Move detection + - Dynamic Token/Channel joining + """ + def __init__(self, listen_port=23456, forward_ports: List[int] = None): + intents = discord.Intents.default() + super().__init__(intents=intents) + + self.listen_port = listen_port + self.forward_ports = forward_ports or [] # List of ports to forward UDP packets to (e.g. [23457]) + + self.audio_buffer = deque(maxlen=160000) # Buffer ~10 seconds of audio + self.udp_sock = None + self.running = False + self.voice_client: Optional[discord.VoiceClient] = None + + # Start the UDP Listener in a separate daemon thread immediately + self.udp_thread = threading.Thread(target=self._udp_listener_loop, daemon=True) + self.udp_thread.start() + + def _udp_listener_loop(self): + """ + Runs in a background thread. Listens for UDP packets from OP25. + 1. Adds data to internal audio buffer for Discord. + 2. Forwards data to other local ports (for Liquidsoap). + """ + self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.udp_sock.bind(('127.0.0.1', self.listen_port)) + self.udp_sock.settimeout(1.0) + + LOGGER.info(f"UDP Audio Bridge listening on 127.0.0.1:{self.listen_port}") + if self.forward_ports: + LOGGER.info(f"Forwarding audio to: {self.forward_ports}") + + forward_socks = [] + for port in self.forward_ports: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + forward_socks.append((s, port)) + + self.running = True + while self.running: + try: + data, addr = self.udp_sock.recvfrom(4096) + + # 1. Forward to Liquidsoap/Other tools + for sock, port in forward_socks: + sock.sendto(data, ('127.0.0.1', port)) + + # 2. Add to Discord Buffer + # We extend the deque with the raw bytes + self.audio_buffer.extend(data) + + except socket.timeout: + continue + except Exception as e: + LOGGER.error(f"UDP Listener Error: {e}") + # Prevent tight loop on socket fail + asyncio.sleep(0.1) + + async def on_ready(self): + LOGGER.info(f'Discord Radio Bot connected as {self.user}') + + async def on_voice_state_update(self, member, before, after): + """ + Handles manual moves. If the bot is moved to a new channel by a user, + update our internal voice_client reference to ensure we keep streaming. + """ + if member.id == self.user.id: + if after.channel is not None and before.channel != after.channel: + LOGGER.info(f"Bot was moved to channel: {after.channel.name}") + # discord.py handles the connection handoff automatically, + # but we log it to confirm persistence. + self.voice_client = member.guild.voice_client + + async def start_session(self, token: str, channel_id: int): + """ + Connects the bot to Discord and joins the specified channel. + This should be called when the 'start' command is received from C2. + """ + # Start the client in the background + asyncio.create_task(self.start(token)) + + # Wait for login to complete (simple poll) + for _ in range(20): + if self.is_ready(): + break + await asyncio.sleep(0.5) + + # Join Channel + try: + channel = self.get_channel(int(channel_id)) + if not channel: + # If cache not ready, try fetching + channel = await self.fetch_channel(int(channel_id)) + + if channel: + self.voice_client = await channel.connect() + LOGGER.info(f"Joined voice channel: {channel.name}") + else: + LOGGER.error(f"Could not find channel ID: {channel_id}") + except Exception as e: + LOGGER.error(f"Failed to join voice: {e}") + + async def stop_session(self): + """ + Disconnects voice and closes the bot connection. + """ + if self.voice_client: + await self.voice_client.disconnect() + await self.close() + + def update_system_presence(self, system_name: str): + """ + Updates the "Playing..." status of the bot. + """ + activity = discord.Activity( + type=discord.ActivityType.listening, + name=f"{system_name}" + ) + asyncio.create_task(self.change_presence(activity=activity)) + + def start_transmission(self): + """ + Called when a radio call STARTS. + Connects the audio buffer to the voice stream. + This turns the green ring ON. + """ + if self.voice_client and not self.voice_client.is_playing(): + # clear buffer slightly to ensure we aren't playing old data + # but keep a little to prevent jitter + # self.audio_buffer.clear() + + source = UDPAudioSource(self.audio_buffer) + self.voice_client.play(source) + + def stop_transmission(self): + """ + Called when a radio call ENDS. + Stops the stream. + This turns the green ring OFF. + """ + if self.voice_client and self.voice_client.is_playing(): + self.voice_client.stop() \ No newline at end of file diff --git a/app/models/models.py b/app/models/models.py index 1f02958..44f5da7 100644 --- a/app/models/models.py +++ b/app/models/models.py @@ -38,7 +38,7 @@ class ChannelConfig(BaseModel): cqpsk_tracking: Optional[bool] = None frequency: Optional[float] = None nbfmSquelch: Optional[float] = None - destination: Optional[str] = "udp://127.0.0.1:23456" + destination: Optional[str] = "udp://127.0.0.1:23457" tracking_threshold: Optional[int] = 120 tracking_feedback: Optional[float] = 0.75 excess_bw: Optional[float] = 0.2 diff --git a/app/node_main.py b/app/node_main.py index c38d745..399fd6e 100644 --- a/app/node_main.py +++ b/app/node_main.py @@ -9,6 +9,7 @@ from internal.logger import create_logger from internal.op25_config_utls import scan_local_library import paho.mqtt.client as mqtt import requests +from internal.discord_radio import DiscordRadioBot # Initialize logging LOGGER = create_logger(__name__) @@ -29,6 +30,9 @@ NODE_LONG = os.getenv("NODE_LONG") # Global flag to track MQTT connection state MQTT_CONNECTED = False +# Initialize the Discord Bot +discord_bot = DiscordRadioBot(listen_port=23457, forward_ports=[23456]) + def handle_c2_command(topic, payload): """ Parses and routes commands received from the C2 server by calling the @@ -105,6 +109,17 @@ def handle_c2_command(topic, payload): except requests.exceptions.RequestException as e: LOGGER.error(f"Failed to connect to OP25 terminal for tuning: {e}") + elif command_type == "discord_join": + token = data.get("token") + channel_id = data.get("channel_id") + if token and channel_id: + asyncio.create_task(discord_bot.start_session(token, channel_id)) + LOGGER.info("Initiating Discord Session...") + + elif command_type == "discord_leave": + asyncio.create_task(discord_bot.stop_session()) + LOGGER.info("Ending Discord Session...") + else: LOGGER.warning(f"Unknown command type received: {command_type}") @@ -317,6 +332,10 @@ async def mqtt_lifecycle_manager(): if last_tgid != 0: # --- END PREVIOUS CALL --- await stop_recording() + + # Stop Discord Transmission + discord_bot.stop_transmission() + audio_url = None if current_call_id: audio_url = await loop.run_in_executor(None, upload_audio, current_call_id) @@ -334,6 +353,10 @@ async def mqtt_lifecycle_manager(): # --- START NEW CALL --- LOGGER.debug(f"Call Start: TGID {current_tgid} ({current_meta.get('alpha_tag')})") + + # Trigger Discord Transmission + discord_bot.start_transmission() + discord_bot.update_system_presence(current_meta.get('sysname', 'Scanning')) # Generate ID start_ts = int(now.timestamp()) @@ -371,6 +394,10 @@ async def mqtt_lifecycle_manager(): elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS: # --- END CALL (Debounce Expired) --- await stop_recording() + + # Stop Discord Transmission + discord_bot.stop_transmission() + audio_url = None if current_call_id: audio_url = await loop.run_in_executor(None, upload_audio, current_call_id)