import asyncio import platform import os from discord import VoiceClient, VoiceChannel, opus, Activity, ActivityType, Intents from discord.ext import commands from typing import Optional, Dict from internal.NoiseGatev2 import NoiseGate from internal.logger import create_logger LOGGER = create_logger(__name__) # Configure discord intents intents = Intents.default() intents.voice_states = True intents.guilds = True class DiscordBotManager: def __init__(self): self.bot: Optional[commands.Bot] = None self.bot_task: Optional[asyncio.Task] = None # This dictionary will hold both the client and its audio stream handler self.voice_connections: Dict[int, Dict] = {} self.token: Optional[str] = None self.loop = asyncio.get_event_loop() self.lock = asyncio.Lock() self._ready_event = asyncio.Event() self._voice_ready_event = asyncio.Event() async def start_bot(self, token: str): async with self.lock: if self.bot and not self.bot.is_closed(): raise RuntimeError("Bot is already running.") if self.bot_task and not self.bot_task.done(): raise RuntimeError("Bot is already running.") self.token = token self.bot = commands.Bot(command_prefix="!", intents=intents) @self.bot.event async def on_ready(): LOGGER.info(f'Logged in as {self.bot.user}') self._ready_event.set() @self.bot.event async def on_voice_state_update(member, before, after): if member != self.bot.user: return if before.channel is None and after.channel is not None: LOGGER.info(f"{member.name} joined voice channel {after.channel.name}") self._voice_ready_event.set() elif before.channel is not None and after.channel is not None and before.channel != after.channel: LOGGER.info(f"{member.name} was moved to voice channel {after.channel.name}") if not self._voice_ready_event.is_set(): self._voice_ready_event.set() elif before.channel is not None and after.channel is None: LOGGER.warning(f"{member.name} left voice channel {before.channel.name}") guild_id = before.channel.guild.id if guild_id in self.voice_connections: LOGGER.warning(f"Bot was disconnected from {guild_id} unexpectedly. Cleaning up...") await self.leave_voice_channel(guild_id) self._voice_ready_event.clear() @self.bot.event async def on_disconnect(): LOGGER.warning("Bot has been disconnected from Discord.") await self.load_opus() self.bot_task = self.loop.create_task(self.bot.start(token)) LOGGER.info("Waiting for bot to become ready...") try: await asyncio.wait_for(self._ready_event.wait(), timeout=60.0) LOGGER.info("Bot is ready.") except asyncio.TimeoutError: LOGGER.error("Timeout waiting for bot to become ready.") if self.bot_task and not self.bot_task.done(): self.bot_task.cancel() raise RuntimeError("Bot failed to become ready within timeout.") async def stop_bot(self): async with self.lock: if self.bot: # Disconnect from all voice channels cleanly for guild_id in list(self.voice_connections.keys()): await self.leave_voice_channel(guild_id) await self.bot.close() self.bot = None if self.bot_task: self.bot_task.cancel() self.bot_task = None self.voice_connections.clear() self._ready_event.clear() LOGGER.info("Bot has been stopped.") async def join_voice_channel(self, guild_id: int, channel_id: int, ng_threshold: int = 50, device_id: int = 4): if not self.bot: raise RuntimeError("Bot is not running.") guild = self.bot.get_guild(guild_id) if not guild: raise ValueError("Guild not found.") if not opus.is_loaded(): raise RuntimeError("Opus is not loaded.") channel = guild.get_channel(channel_id) if not isinstance(channel, VoiceChannel): raise ValueError("Channel is not a voice channel.") if guild_id in self.voice_connections: raise RuntimeError("Already connected to this guild's voice channel.") try: # 1. Connect to the channel first self._voice_ready_event.clear() voice_client = await channel.connect(timeout=60.0, reconnect=True) LOGGER.debug("Voice client connecting...") # 2. Wait for the on_voice_state_update event to confirm readiness await asyncio.wait_for(self._voice_ready_event.wait(), timeout=15.0) LOGGER.info("Bot voice connection is ready.") # 3. NOW, create and start the audio stream handler stream_handler = NoiseGate( _input_device_index=device_id, _voice_connection=voice_client, _noise_gate_threshold=ng_threshold) stream_handler.run() # 4. Store both client and stream handler for proper management self.voice_connections[guild_id] = { "client": voice_client, "stream": stream_handler } LOGGER.info(f"Joined guild {guild_id} and audio stream is now running.") except asyncio.TimeoutError: LOGGER.error(f"Timeout waiting for bot to join voice channel {channel_id}.") raise RuntimeError("Bot failed to confirm voice connection within timeout.") except Exception as e: LOGGER.error(f"Failed to connect to voice channel: {e}", exc_info=True) raise async def leave_voice_channel(self, guild_id: int): if not self.bot: raise RuntimeError("Bot is not running.") connection_info = self.voice_connections.get(guild_id) if not connection_info: raise RuntimeError("Not connected to the specified guild's voice channel.") # Cleanly stop the associated audio stream first stream_handler = connection_info.get('stream') if stream_handler: LOGGER.info(f"Stopping audio stream for guild {guild_id}.") await stream_handler.close() # Disconnect the voice client voice_client = connection_info.get('client') if voice_client and voice_client.is_connected(): await voice_client.disconnect() del self.voice_connections[guild_id] LOGGER.info(f"Left guild {guild_id} voice channel.") async def load_opus(self): processor = platform.machine() script_dir = os.path.dirname(os.path.abspath(__file__)) LOGGER.debug(f"Processor: {processor}, OS: {os.name}") try: if os.name == 'nt': # Windows if processor == "AMD64": opus.load_opus(os.path.join(script_dir, './opus/libopus_amd64.dll')) LOGGER.info("Loaded OPUS library for AMD64") else: # Linux / other if processor == "aarch64": opus.load_opus(os.path.join(script_dir, './opus/libopus_aarcch64.so')) LOGGER.info("Loaded OPUS library for aarch64") elif processor == "armv7l": opus.load_opus(os.path.join(script_dir, './opus/libopus_armv7l.so')) LOGGER.info("Loaded OPUS library for armv7l") else: # Fallback for other Linux archs like x86_64 opus.load_opus('libopus.so.0') LOGGER.info(f"Loaded system OPUS library for {processor}") except Exception as e: LOGGER.error(f"Failed to load OPUS library: {e}") raise RuntimeError("Could not load a valid Opus library. Voice functionality will fail.") async def set_presence(self, system_name: str): if not self.bot or not self.bot.is_ready(): LOGGER.warning("Bot is not ready, cannot set presence.") return try: activity = Activity(type=ActivityType.listening, name=system_name) await self.bot.change_presence(activity=activity) LOGGER.info(f"Bot presence set to 'Listening to {system_name}'") except Exception as pe: LOGGER.error(f"Unable to set presence: '{pe}'")