5 Commits

Author SHA1 Message Date
Logan Cusano
d8fc867f98 Trying to resolve discord functions to see if audio works there but just not being relayed to liq 2026-01-04 04:00:50 -05:00
Logan Cusano
deb87d5888 Trying to retransmit UDP audio 2026-01-04 01:51:01 -05:00
Logan Cusano
b7c7158ea3 Use the global logger 2026-01-04 01:40:44 -05:00
Logan Cusano
4f5dcaf6ce Updates to discord radio module to try and fix redirection problem 2026-01-04 01:32:46 -05:00
Logan Cusano
00f4ebea2d Only try to update presence and transmit when the bot is online 2026-01-04 01:18:11 -05:00
2 changed files with 66 additions and 44 deletions

View File

@@ -1,30 +1,25 @@
import discord import discord
import asyncio import asyncio
import socket import socket
import logging
import threading import threading
import struct
import subprocess import subprocess
import shlex import shlex
import time
from internal.logger import create_logger
from collections import deque from collections import deque
from typing import Optional, List, Tuple from typing import Optional, List
LOGGER = logging.getLogger("discord_radio") LOGGER = create_logger(__name__)
class UDPAudioSource(discord.AudioSource): class UDPAudioSource(discord.AudioSource):
""" """
A custom Discord AudioSource that reads raw PCM audio from a thread-safe buffer. A custom Discord AudioSource that reads raw PCM audio from a thread-safe buffer.
This buffer is filled by the UDPListener running in the background.
It pipes the raw 8000Hz 16-bit audio into FFmpeg to convert it to It pipes the raw 8000Hz 16-bit audio into FFmpeg to convert it to
Discord's required 48000Hz stereo Opus format. Discord's required 48000Hz stereo Opus format.
""" """
def __init__(self, buffer: deque): def __init__(self, buffer: deque):
self.buffer = buffer self.buffer = buffer
# We start an FFmpeg process that reads from stdin (pipe) and outputs raw PCM (s16le) at 48k # We start an FFmpeg process that reads from stdin (pipe) and outputs raw PCM (s16le) at 48k
# discord.py then encodes this to Opus.
# Note: We use a pipe here because we are feeding it chunks from our UDP buffer.
self.ffmpeg_process = subprocess.Popen( self.ffmpeg_process = subprocess.Popen(
shlex.split( shlex.split(
"ffmpeg -f s16le -ar 8000 -ac 1 -i pipe:0 -f s16le -ar 48000 -ac 2 pipe:1 -loglevel panic" "ffmpeg -f s16le -ar 8000 -ac 1 -i pipe:0 -f s16le -ar 48000 -ac 2 pipe:1 -loglevel panic"
@@ -69,7 +64,11 @@ class UDPAudioSource(discord.AudioSource):
def cleanup(self): def cleanup(self):
if self.ffmpeg_process: if self.ffmpeg_process:
self.ffmpeg_process.kill() try:
self.ffmpeg_process.stdin.close()
self.ffmpeg_process.terminate()
except:
pass
class DiscordRadioBot(discord.Client): class DiscordRadioBot(discord.Client):
""" """
@@ -80,19 +79,25 @@ class DiscordRadioBot(discord.Client):
- Packet Forwarding (to keep Liquidsoap/Icecast working) - Packet Forwarding (to keep Liquidsoap/Icecast working)
- Manual Channel Move detection - Manual Channel Move detection
- Dynamic Token/Channel joining - Dynamic Token/Channel joining
- Debug Stats logging
""" """
def __init__(self, listen_port=23456, forward_ports: List[int] = None): def __init__(self, listen_port=23456, forward_ports: List[int] = None):
intents = discord.Intents.default() intents = discord.Intents.default()
super().__init__(intents=intents) super().__init__(intents=intents)
self.listen_port = listen_port self.listen_port = listen_port
self.forward_ports = forward_ports or [] # List of ports to forward UDP packets to (e.g. [23457]) self.forward_ports = forward_ports or []
self.audio_buffer = deque(maxlen=160000) # Buffer ~10 seconds of audio self.audio_buffer = deque(maxlen=160000) # Buffer ~10 seconds of audio
self.udp_sock = None self.udp_sock = None
self.running = False self.running = False
self.voice_client: Optional[discord.VoiceClient] = None self.voice_client: Optional[discord.VoiceClient] = None
# Debug Stats
self.packets_received = 0
self.packets_forwarded = 0
self.last_log_time = time.time()
# Start the UDP Listener in a separate daemon thread immediately # Start the UDP Listener in a separate daemon thread immediately
self.udp_thread = threading.Thread(target=self._udp_listener_loop, daemon=True) self.udp_thread = threading.Thread(target=self._udp_listener_loop, daemon=True)
self.udp_thread.start() self.udp_thread.start()
@@ -100,76 +105,81 @@ class DiscordRadioBot(discord.Client):
def _udp_listener_loop(self): def _udp_listener_loop(self):
""" """
Runs in a background thread. Listens for UDP packets from OP25. Runs in a background thread. Listens for UDP packets from OP25.
1. Adds data to internal audio buffer for Discord.
2. Forwards data to other local ports (for Liquidsoap).
""" """
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind(('127.0.0.1', self.listen_port)) # Bind to all interfaces to ensure we catch the OP25 stream
self.udp_sock.bind(('0.0.0.0', self.listen_port))
self.udp_sock.settimeout(1.0) self.udp_sock.settimeout(1.0)
LOGGER.info(f"UDP Audio Bridge listening on 127.0.0.1:{self.listen_port}") LOGGER.info(f"UDP Audio Bridge listening on 0.0.0.0:{self.listen_port}")
if self.forward_ports: LOGGER.info(f"Forwarding audio to: {self.forward_ports}")
LOGGER.info(f"Forwarding audio to: {self.forward_ports}")
forward_socks = [] # Create sockets for each forward port
forward_targets = []
for port in self.forward_ports: for port in self.forward_ports:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
forward_socks.append((s, port)) forward_targets.append((s, port))
self.running = True self.running = True
while self.running: while self.running:
try: try:
data, addr = self.udp_sock.recvfrom(4096) data, addr = self.udp_sock.recvfrom(4096)
self.packets_received += 1
# 1. Forward to Liquidsoap/Other tools # 1. Forward to Liquidsoap/audio.py
for sock, port in forward_socks: for sock, port in forward_targets:
sock.sendto(data, ('127.0.0.1', port)) try:
sock.sendto(data, ('127.0.0.1', port))
self.packets_forwarded += 1
except Exception as fe:
# Log forwarding errors only once per 5 seconds to avoid spam
if self.packets_forwarded % 100 == 0:
LOGGER.error(f"Forward error to port {port}: {fe}")
# 2. Add to Discord Buffer # 2. Add to Discord Buffer
# We extend the deque with the raw bytes
self.audio_buffer.extend(data) self.audio_buffer.extend(data)
# Periodic Stats Logging
if time.time() - self.last_log_time > 10:
if self.packets_received > 0:
LOGGER.info(f"Discord Audio Bridge Stats: Received {self.packets_received} packets, Forwarded {self.packets_forwarded}")
self.packets_received = 0
self.packets_forwarded = 0
self.last_log_time = time.time()
except socket.timeout: except socket.timeout:
continue continue
except Exception as e: except Exception as e:
LOGGER.error(f"UDP Listener Error: {e}") LOGGER.error(f"UDP Listener Error: {e}")
# Prevent tight loop on socket fail time.sleep(0.1)
asyncio.sleep(0.1)
async def on_ready(self): async def on_ready(self):
LOGGER.info(f'Discord Radio Bot connected as {self.user}') LOGGER.info(f'Discord Radio Bot connected as {self.user}')
async def on_voice_state_update(self, member, before, after): async def on_voice_state_update(self, member, before, after):
""" """
Handles manual moves. If the bot is moved to a new channel by a user, Handles manual moves.
update our internal voice_client reference to ensure we keep streaming.
""" """
if member.id == self.user.id: if member.id == self.user.id:
if after.channel is not None and before.channel != after.channel: if after.channel is not None and before.channel != after.channel:
LOGGER.info(f"Bot was moved to channel: {after.channel.name}") LOGGER.info(f"Bot was moved to channel: {after.channel.name}")
# discord.py handles the connection handoff automatically,
# but we log it to confirm persistence.
self.voice_client = member.guild.voice_client self.voice_client = member.guild.voice_client
async def start_session(self, token: str, channel_id: int): async def start_session(self, token: str, channel_id: int):
""" """
Connects the bot to Discord and joins the specified channel. Connects the bot to Discord and joins the specified channel.
This should be called when the 'start' command is received from C2.
""" """
# Start the client in the background
asyncio.create_task(self.start(token)) asyncio.create_task(self.start(token))
# Wait for login to complete (simple poll) # Wait for login to complete
for _ in range(20): for _ in range(20):
if self.is_ready(): if self.is_ready():
break break
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Join Channel
try: try:
channel = self.get_channel(int(channel_id)) channel = self.get_channel(int(channel_id))
if not channel: if not channel:
# If cache not ready, try fetching
channel = await self.fetch_channel(int(channel_id)) channel = await self.fetch_channel(int(channel_id))
if channel: if channel:
@@ -205,10 +215,8 @@ class DiscordRadioBot(discord.Client):
This turns the green ring ON. This turns the green ring ON.
""" """
if self.voice_client and not self.voice_client.is_playing(): if self.voice_client and not self.voice_client.is_playing():
# clear buffer slightly to ensure we aren't playing old data # Clear old audio so we don't start with a delay
# but keep a little to prevent jitter self.audio_buffer.clear()
# self.audio_buffer.clear()
source = UDPAudioSource(self.audio_buffer) source = UDPAudioSource(self.audio_buffer)
self.voice_client.play(source) self.voice_client.play(source)

View File

@@ -30,6 +30,9 @@ NODE_LONG = os.getenv("NODE_LONG")
# Global flag to track MQTT connection state # Global flag to track MQTT connection state
MQTT_CONNECTED = False MQTT_CONNECTED = False
# Global variable to hold the main event loop
main_loop = None
# Initialize the Discord Bot # Initialize the Discord Bot
discord_bot = DiscordRadioBot(listen_port=23457, forward_ports=[23456]) discord_bot = DiscordRadioBot(listen_port=23457, forward_ports=[23456])
@@ -113,11 +116,17 @@ def handle_c2_command(topic, payload):
token = data.get("token") token = data.get("token")
channel_id = data.get("channel_id") channel_id = data.get("channel_id")
if token and channel_id: if token and channel_id:
asyncio.create_task(discord_bot.start_session(token, channel_id)) if main_loop and main_loop.is_running():
asyncio.run_coroutine_threadsafe(discord_bot.start_session(token, channel_id), main_loop)
else:
LOGGER.error("Main event loop not available to start Discord session.")
LOGGER.info("Initiating Discord Session...") LOGGER.info("Initiating Discord Session...")
elif command_type == "discord_leave": elif command_type == "discord_leave":
asyncio.create_task(discord_bot.stop_session()) if main_loop and main_loop.is_running():
asyncio.run_coroutine_threadsafe(discord_bot.stop_session(), main_loop)
else:
LOGGER.error("Main event loop not available to stop Discord session.")
LOGGER.info("Ending Discord Session...") LOGGER.info("Ending Discord Session...")
else: else:
@@ -334,7 +343,8 @@ async def mqtt_lifecycle_manager():
await stop_recording() await stop_recording()
# Stop Discord Transmission # Stop Discord Transmission
discord_bot.stop_transmission() if discord_bot.is_ready():
discord_bot.stop_transmission()
audio_url = None audio_url = None
if current_call_id: if current_call_id:
@@ -355,8 +365,9 @@ async def mqtt_lifecycle_manager():
LOGGER.debug(f"Call Start: TGID {current_tgid} ({current_meta.get('alpha_tag')})") LOGGER.debug(f"Call Start: TGID {current_tgid} ({current_meta.get('alpha_tag')})")
# Trigger Discord Transmission # Trigger Discord Transmission
discord_bot.start_transmission() if discord_bot.is_ready():
discord_bot.update_system_presence(current_meta.get('sysname', 'Scanning')) discord_bot.start_transmission()
discord_bot.update_system_presence(current_meta.get('sysname', 'Scanning'))
# Generate ID # Generate ID
start_ts = int(now.timestamp()) start_ts = int(now.timestamp())
@@ -396,7 +407,8 @@ async def mqtt_lifecycle_manager():
await stop_recording() await stop_recording()
# Stop Discord Transmission # Stop Discord Transmission
discord_bot.stop_transmission() if discord_bot.is_ready():
discord_bot.stop_transmission()
audio_url = None audio_url = None
if current_call_id: if current_call_id:
@@ -462,6 +474,8 @@ async def mqtt_lifecycle_manager():
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
global main_loop
main_loop = asyncio.get_running_loop()
# Store the task so we can cancel it if needed (optional) # Store the task so we can cancel it if needed (optional)
app.state.mqtt_task = asyncio.create_task(mqtt_lifecycle_manager()) app.state.mqtt_task = asyncio.create_task(mqtt_lifecycle_manager())