Files
drb-edge-node/app/internal/discord_radio.py
2026-01-04 00:12:58 -05:00

222 lines
8.2 KiB
Python

import discord
import asyncio
import socket
import logging
import threading
import struct
import subprocess
import shlex
from collections import deque
from typing import Optional, List, Tuple
LOGGER = logging.getLogger("discord_radio")
class UDPAudioSource(discord.AudioSource):
"""
A custom Discord AudioSource that reads raw PCM audio from a thread-safe buffer.
This buffer is filled by the UDPListener running in the background.
It pipes the raw 8000Hz 16-bit audio into FFmpeg to convert it to
Discord's required 48000Hz stereo Opus format.
"""
def __init__(self, buffer: deque):
self.buffer = buffer
# We start an FFmpeg process that reads from stdin (pipe) and outputs raw PCM (s16le) at 48k
# discord.py then encodes this to Opus.
# Note: We use a pipe here because we are feeding it chunks from our UDP buffer.
self.ffmpeg_process = subprocess.Popen(
shlex.split(
"ffmpeg -f s16le -ar 8000 -ac 1 -i pipe:0 -f s16le -ar 48000 -ac 2 pipe:1 -loglevel panic"
),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
self._buffer_lock = threading.Lock()
def read(self) -> bytes:
"""
Reads 20ms of audio from the buffer, feeds it to ffmpeg,
and returns the converted bytes to Discord.
"""
# Discord requests 20ms of audio.
# At 8000Hz 16-bit mono, 1 second = 16000 bytes. 20ms = 320 bytes.
chunk_size = 320
data = b''
# Pull data from the UDP thread buffer
with self._buffer_lock:
if len(self.buffer) >= chunk_size:
# Optimized deque slicing
data = bytes([self.buffer.popleft() for _ in range(chunk_size)])
else:
# If we don't have enough data (buffer underrun), send silence to keep pipe open
data = b'\x00' * chunk_size
try:
# Write raw 8k mono to ffmpeg stdin
self.ffmpeg_process.stdin.write(data)
self.ffmpeg_process.stdin.flush()
# Read converted 48k stereo from ffmpeg stdout
# 48000 Hz * 2 channels * 2 bytes (16bit) * 0.02s = 3840 bytes
converted_data = self.ffmpeg_process.stdout.read(3840)
return converted_data
except Exception as e:
LOGGER.error(f"Error in FFmpeg conversion: {e}")
return b'\x00' * 3840
def cleanup(self):
if self.ffmpeg_process:
self.ffmpeg_process.kill()
class DiscordRadioBot(discord.Client):
"""
A dedicated Discord Client for streaming Radio traffic.
Features:
- Independent UDP Listener (non-blocking)
- Packet Forwarding (to keep Liquidsoap/Icecast working)
- Manual Channel Move detection
- Dynamic Token/Channel joining
"""
def __init__(self, listen_port=23456, forward_ports: List[int] = None):
intents = discord.Intents.default()
super().__init__(intents=intents)
self.listen_port = listen_port
self.forward_ports = forward_ports or [] # List of ports to forward UDP packets to (e.g. [23457])
self.audio_buffer = deque(maxlen=160000) # Buffer ~10 seconds of audio
self.udp_sock = None
self.running = False
self.voice_client: Optional[discord.VoiceClient] = None
# Start the UDP Listener in a separate daemon thread immediately
self.udp_thread = threading.Thread(target=self._udp_listener_loop, daemon=True)
self.udp_thread.start()
def _udp_listener_loop(self):
"""
Runs in a background thread. Listens for UDP packets from OP25.
1. Adds data to internal audio buffer for Discord.
2. Forwards data to other local ports (for Liquidsoap).
"""
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind(('127.0.0.1', self.listen_port))
self.udp_sock.settimeout(1.0)
LOGGER.info(f"UDP Audio Bridge listening on 127.0.0.1:{self.listen_port}")
if self.forward_ports:
LOGGER.info(f"Forwarding audio to: {self.forward_ports}")
forward_socks = []
for port in self.forward_ports:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
forward_socks.append((s, port))
self.running = True
while self.running:
try:
data, addr = self.udp_sock.recvfrom(4096)
# 1. Forward to Liquidsoap/Other tools
for sock, port in forward_socks:
sock.sendto(data, ('127.0.0.1', port))
# 2. Add to Discord Buffer
# We extend the deque with the raw bytes
self.audio_buffer.extend(data)
except socket.timeout:
continue
except Exception as e:
LOGGER.error(f"UDP Listener Error: {e}")
# Prevent tight loop on socket fail
asyncio.sleep(0.1)
async def on_ready(self):
LOGGER.info(f'Discord Radio Bot connected as {self.user}')
async def on_voice_state_update(self, member, before, after):
"""
Handles manual moves. If the bot is moved to a new channel by a user,
update our internal voice_client reference to ensure we keep streaming.
"""
if member.id == self.user.id:
if after.channel is not None and before.channel != after.channel:
LOGGER.info(f"Bot was moved to channel: {after.channel.name}")
# discord.py handles the connection handoff automatically,
# but we log it to confirm persistence.
self.voice_client = member.guild.voice_client
async def start_session(self, token: str, channel_id: int):
"""
Connects the bot to Discord and joins the specified channel.
This should be called when the 'start' command is received from C2.
"""
# Start the client in the background
asyncio.create_task(self.start(token))
# Wait for login to complete (simple poll)
for _ in range(20):
if self.is_ready():
break
await asyncio.sleep(0.5)
# Join Channel
try:
channel = self.get_channel(int(channel_id))
if not channel:
# If cache not ready, try fetching
channel = await self.fetch_channel(int(channel_id))
if channel:
self.voice_client = await channel.connect()
LOGGER.info(f"Joined voice channel: {channel.name}")
else:
LOGGER.error(f"Could not find channel ID: {channel_id}")
except Exception as e:
LOGGER.error(f"Failed to join voice: {e}")
async def stop_session(self):
"""
Disconnects voice and closes the bot connection.
"""
if self.voice_client:
await self.voice_client.disconnect()
await self.close()
def update_system_presence(self, system_name: str):
"""
Updates the "Playing..." status of the bot.
"""
activity = discord.Activity(
type=discord.ActivityType.listening,
name=f"{system_name}"
)
asyncio.create_task(self.change_presence(activity=activity))
def start_transmission(self):
"""
Called when a radio call STARTS.
Connects the audio buffer to the voice stream.
This turns the green ring ON.
"""
if self.voice_client and not self.voice_client.is_playing():
# clear buffer slightly to ensure we aren't playing old data
# but keep a little to prevent jitter
# self.audio_buffer.clear()
source = UDPAudioSource(self.audio_buffer)
self.voice_client.play(source)
def stop_transmission(self):
"""
Called when a radio call ENDS.
Stops the stream.
This turns the green ring OFF.
"""
if self.voice_client and self.voice_client.is_playing():
self.voice_client.stop()