Files
drb-edge-node/app/internal/discord_radio.py

225 lines
8.1 KiB
Python

import discord
import asyncio
import socket
import logging
import threading
import subprocess
import shlex
import time
from collections import deque
from typing import Optional, List
LOGGER = logging.getLogger("discord_radio")
class UDPAudioSource(discord.AudioSource):
"""
A custom Discord AudioSource that reads raw PCM audio from a thread-safe buffer.
It pipes the raw 8000Hz 16-bit audio into FFmpeg to convert it to
Discord's required 48000Hz stereo Opus format.
"""
def __init__(self, buffer: deque):
self.buffer = buffer
# We start an FFmpeg process that reads from stdin (pipe) and outputs raw PCM (s16le) at 48k
self.ffmpeg_process = subprocess.Popen(
shlex.split(
"ffmpeg -f s16le -ar 8000 -ac 1 -i pipe:0 -f s16le -ar 48000 -ac 2 pipe:1 -loglevel panic"
),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
self._buffer_lock = threading.Lock()
def read(self) -> bytes:
"""
Reads 20ms of audio from the buffer, feeds it to ffmpeg,
and returns the converted bytes to Discord.
"""
# Discord requests 20ms of audio.
# At 8000Hz 16-bit mono, 1 second = 16000 bytes. 20ms = 320 bytes.
chunk_size = 320
data = b''
# Pull data from the UDP thread buffer
with self._buffer_lock:
if len(self.buffer) >= chunk_size:
# Optimized deque slicing
data = bytes([self.buffer.popleft() for _ in range(chunk_size)])
else:
# If we don't have enough data (buffer underrun), send silence to keep pipe open
data = b'\x00' * chunk_size
try:
# Write raw 8k mono to ffmpeg stdin
self.ffmpeg_process.stdin.write(data)
self.ffmpeg_process.stdin.flush()
# Read converted 48k stereo from ffmpeg stdout
# 48000 Hz * 2 channels * 2 bytes (16bit) * 0.02s = 3840 bytes
converted_data = self.ffmpeg_process.stdout.read(3840)
return converted_data
except Exception as e:
LOGGER.error(f"Error in FFmpeg conversion: {e}")
return b'\x00' * 3840
def cleanup(self):
if self.ffmpeg_process:
self.ffmpeg_process.kill()
class DiscordRadioBot(discord.Client):
"""
A dedicated Discord Client for streaming Radio traffic.
Features:
- Independent UDP Listener (non-blocking)
- Packet Forwarding (to keep Liquidsoap/Icecast working)
- Manual Channel Move detection
- Dynamic Token/Channel joining
- Debug Stats logging
"""
def __init__(self, listen_port=23456, forward_ports: List[int] = None):
intents = discord.Intents.default()
super().__init__(intents=intents)
self.listen_port = listen_port
self.forward_ports = forward_ports or []
self.audio_buffer = deque(maxlen=160000) # Buffer ~10 seconds of audio
self.udp_sock = None
self.running = False
self.voice_client: Optional[discord.VoiceClient] = None
# Debug Stats
self.packets_received = 0
self.packets_forwarded = 0
self.last_log_time = time.time()
# Start the UDP Listener in a separate daemon thread immediately
self.udp_thread = threading.Thread(target=self._udp_listener_loop, daemon=True)
self.udp_thread.start()
def _udp_listener_loop(self):
"""
Runs in a background thread. Listens for UDP packets from OP25.
"""
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Bind to 0.0.0.0 to ensure we catch traffic from any interface (Docker/Localhost)
self.udp_sock.bind(('0.0.0.0', self.listen_port))
self.udp_sock.settimeout(1.0)
LOGGER.info(f"UDP Audio Bridge listening on 0.0.0.0:{self.listen_port}")
LOGGER.info(f"Forwarding audio to: {self.forward_ports}")
forward_socks = []
for port in self.forward_ports:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
forward_socks.append((s, port))
self.running = True
while self.running:
try:
data, addr = self.udp_sock.recvfrom(4096)
self.packets_received += 1
# 1. Forward to Liquidsoap/Other tools
for sock, port in forward_socks:
# Send to localhost (where Liquidsoap should be listening)
sock.sendto(data, ('127.0.0.1', port))
self.packets_forwarded += 1
# 2. Add to Discord Buffer
self.audio_buffer.extend(data)
# Periodic Debug Logging (Every 5 seconds, only if active)
if time.time() - self.last_log_time > 5:
if self.packets_received > 0:
LOGGER.info(f"UDP Stats [5s]: Rx {self.packets_received} pkts | Tx {self.packets_forwarded} pkts | Buffer: {len(self.audio_buffer)} bytes")
# Reset counters
self.packets_received = 0
self.packets_forwarded = 0
self.last_log_time = time.time()
except socket.timeout:
continue
except Exception as e:
LOGGER.error(f"UDP Listener Error: {e}")
time.sleep(0.1)
async def on_ready(self):
LOGGER.info(f'Discord Radio Bot connected as {self.user}')
async def on_voice_state_update(self, member, before, after):
"""
Handles manual moves.
"""
if member.id == self.user.id:
if after.channel is not None and before.channel != after.channel:
LOGGER.info(f"Bot was moved to channel: {after.channel.name}")
self.voice_client = member.guild.voice_client
async def start_session(self, token: str, channel_id: int):
"""
Connects the bot to Discord and joins the specified channel.
"""
asyncio.create_task(self.start(token))
# Wait for login to complete
for _ in range(20):
if self.is_ready():
break
await asyncio.sleep(0.5)
try:
channel = self.get_channel(int(channel_id))
if not channel:
channel = await self.fetch_channel(int(channel_id))
if channel:
self.voice_client = await channel.connect()
LOGGER.info(f"Joined voice channel: {channel.name}")
else:
LOGGER.error(f"Could not find channel ID: {channel_id}")
except Exception as e:
LOGGER.error(f"Failed to join voice: {e}")
async def stop_session(self):
"""
Disconnects voice and closes the bot connection.
"""
if self.voice_client:
await self.voice_client.disconnect()
await self.close()
def update_system_presence(self, system_name: str):
"""
Updates the "Playing..." status of the bot.
"""
activity = discord.Activity(
type=discord.ActivityType.listening,
name=f"{system_name}"
)
asyncio.create_task(self.change_presence(activity=activity))
def start_transmission(self):
"""
Called when a radio call STARTS.
Connects the audio buffer to the voice stream.
This turns the green ring ON.
"""
if self.voice_client and not self.voice_client.is_playing():
# clear buffer slightly to ensure we aren't playing old data
# but keep a little to prevent jitter
# self.audio_buffer.clear()
source = UDPAudioSource(self.audio_buffer)
self.voice_client.play(source)
def stop_transmission(self):
"""
Called when a radio call ENDS.
Stops the stream.
This turns the green ring OFF.
"""
if self.voice_client and self.voice_client.is_playing():
self.voice_client.stop()