Files
drb-client-discord-bot/app/internal/bot_manager.py
Logan Cusano 5191433e5d
Some checks failed
release-tag / release-image (push) Successful in 1h14m57s
Lint / lint (push) Failing after 12s
Add wait to joining voice
2025-04-27 03:29:06 -04:00

172 lines
7.0 KiB
Python

import asyncio
import platform
import os
from discord import VoiceClient, VoiceChannel, opus, Activity, ActivityType, Intents
from discord.ext import commands
from typing import Optional, Dict
from internal.NoiseGatev2 import NoiseGate
from internal.logger import create_logger
LOGGER = create_logger(__name__)
# Configure discord intents
intents = Intents.default()
intents.voice_states = True
intents.guilds = True
class DiscordBotManager:
def __init__(self):
self.bot: Optional[commands.Bot] = None
self.bot_task: Optional[asyncio.Task] = None
self.voice_clients: Dict[int, VoiceClient] = {}
self.token: Optional[str] = None
self.loop = asyncio.get_event_loop()
self.lock = asyncio.Lock()
self._ready_event = asyncio.Event()
self._voice_ready_event = asyncio.Event()
async def start_bot(self, token: str):
async with self.lock:
if self.bot and not self.bot.is_closed():
raise RuntimeError("Bot is already running.")
if self.bot_task and not self.bot_task.done():
raise RuntimeError("Bot is already running.")
self.token = token
self.bot = commands.Bot(command_prefix="!", intents=intents)
@self.bot.event
async def on_ready():
LOGGER.info(f'Logged in as {self.bot.user}')
# Set the event when on_ready is called
self._ready_event.set()
@self.bot.event
async def on_voice_state_update(member, before, after):
# Check if the bot was disconnected
if member == self.bot.user and after.channel is None:
guild_id = before.channel.guild.id
LOGGER.info(f"Bot was disconnected from channel in guild {guild_id}. Attempting to reconnect...")
try:
await self.leave_voice_channel(guild_id)
except Exception as e:
LOGGER.warning(f"Error leaving voice channel: '{e}'")
# Attempt to reconnect to the channel after a brief pause
await asyncio.sleep(2)
await self.join_voice_channel(guild_id, before.channel.id)
if member == self.bot.user and before.channel is None and after.channel is not None:
print(f"{member.name} joined voice channel {after.channel.name}")
self._voice_ready_event.set()
# Load Opus for the current CPU
await self.load_opus()
# Create the task to run the bot in the background
self.bot_task = self.loop.create_task(self.bot.start(token))
# Wait for the on_ready event to be set by the bot task
LOGGER.info("Waiting for bot to become ready...")
try:
await asyncio.wait_for(self._ready_event.wait(), timeout=60.0)
LOGGER.info("Bot is ready, start_bot returning.")
return
except asyncio.TimeoutError:
LOGGER.error("Timeout waiting for bot to become ready. Bot might have failed to start.")
if self.bot_task and not self.bot_task.done():
self.bot_task.cancel()
raise RuntimeError("Bot failed to become ready within timeout.")
async def stop_bot(self):
async with self.lock:
if self.bot:
await self.bot.close()
self.bot = None
if self.bot_task:
await self.bot_task
self.bot_task = None
self.voice_clients.clear()
self._ready_event.clear()
LOGGER.info("Bot has been stopped.")
async def join_voice_channel(self, guild_id: int, channel_id: int, ng_threshold: int = 50, device_id: int = 4):
if not self.bot:
raise RuntimeError("Bot is not running.")
guild = self.bot.get_guild(guild_id)
if not guild:
raise ValueError("Guild not found.")
if not opus.is_loaded():
raise RuntimeError("Opus is not loaded.")
channel = guild.get_channel(channel_id)
if not isinstance(channel, VoiceChannel):
raise ValueError("Channel is not a voice channel.")
if guild_id in self.voice_clients:
raise RuntimeError("Already connected to this guild's voice channel.")
try:
voice_client = await channel.connect(timeout=60.0, reconnect=True)
LOGGER.debug("Voice Connected.")
streamHandler = NoiseGate(
_input_device_index=device_id,
_voice_connection=voice_client,
_noise_gate_threshold=ng_threshold)
streamHandler.run()
LOGGER.debug("Stream is running.")
self.voice_clients[guild_id] = voice_client
LOGGER.info(f"Joined guild {guild_id} voice channel {channel_id} and stream is running.")
except Exception as e:
LOGGER.error(f"Failed to connect to voice channel: {e}")
LOGGER.info("Waiting for bot to join voice...")
try:
await asyncio.wait_for(self._voice_ready_event.wait(), timeout=60.0)
LOGGER.info("Bot joined voice, returning.")
return
except asyncio.TimeoutError:
LOGGER.error("Timeout waiting for bot to join voice.")
raise RuntimeError("Bot failed to join voice within timeout.")
async def leave_voice_channel(self, guild_id: int):
if not self.bot:
raise RuntimeError("Bot is not running.")
voice_client = self.voice_clients.get(guild_id)
if not voice_client:
raise RuntimeError("Not connected to the specified guild's voice channel.")
await voice_client.disconnect()
del self.voice_clients[guild_id]
LOGGER.info(f"Left guild {guild_id} voice channel.")
async def load_opus(self):
""" Load the proper OPUS library for the device being used """
processor = platform.machine()
script_dir = os.path.dirname(os.path.abspath(__file__))
LOGGER.debug("Processor: ", processor)
if os.name == 'nt':
if processor == "AMD64":
opus.load_opus(os.path.join(script_dir, './opus/libopus_amd64.dll'))
LOGGER.info("Loaded OPUS library for AMD64")
return "AMD64"
else:
if processor == "aarch64":
opus.load_opus(os.path.join(script_dir, './opus/libopus_aarcch64.so'))
LOGGER.info("Loaded OPUS library for aarch64")
return "aarch64"
elif processor == "armv7l":
opus.load_opus(os.path.join(script_dir, './opus/libopus_armv7l.so'))
LOGGER.info("Loaded OPUS library for armv7l")
return "armv7l"
async def set_presence(self, presence: str):
""" Set the presense (activity) of the bot """
try:
await self.bot.change_presence(activity=Activity(type=ActivityType.listening, name=presence))
except Exception as pe:
LOGGER.error(f"Unable to set presence: '{pe}'")