import asyncio import platform import os from discord import VoiceClient, VoiceChannel, opus, Activity, ActivityType, Intents from discord.ext import commands from typing import Optional, Dict from internal.logger import create_logger from internal.NoiseGatev2 import AudioStreamManager, NoiseGateSource LOGGER = create_logger(__name__) intents = Intents.default() intents.voice_states = True intents.guilds = True class DiscordBotManager: def __init__(self): self.bot: Optional[commands.Bot] = None self.bot_task: Optional[asyncio.Task] = None self.voice_connections: Dict[int, Dict] = {} self.token: Optional[str] = None self.loop = asyncio.get_event_loop() self.lock = asyncio.Lock() self._ready_event = asyncio.Event() self._voice_ready_event = asyncio.Event() async def start_bot(self, token: str): async with self.lock: if self.bot and not self.bot.is_closed(): raise RuntimeError("Bot is already running.") if self.bot_task and not self.bot_task.done(): raise RuntimeError("Bot is already running.") self.token = token self.bot = commands.Bot(command_prefix="!", intents=intents) @self.bot.event async def on_ready(): LOGGER.info(f'Logged in as {self.bot.user}') self._ready_event.set() @self.bot.event async def on_voice_state_update(member, before, after): if member != self.bot.user: return if before.channel is None and after.channel is not None: LOGGER.info(f"{member.name} joined voice channel {after.channel.name}") self._voice_ready_event.set() elif before.channel is not None and after.channel is not None and before.channel != after.channel: LOGGER.info(f"{member.name} was moved to voice channel {after.channel.name}") if not self._voice_ready_event.is_set(): self._voice_ready_event.set() elif before.channel is not None and after.channel is None: LOGGER.warning(f"{member.name} left voice channel {before.channel.name}") guild_id = before.channel.guild.id if guild_id in self.voice_connections: LOGGER.warning(f"Bot was disconnected from {guild_id} unexpectedly. Cleaning up...") await self.leave_voice_channel(guild_id) self._voice_ready_event.clear() @self.bot.event async def on_disconnect(): LOGGER.warning("Bot has been disconnected from Discord.") await self.load_opus() self.bot_task = self.loop.create_task(self.bot.start(token)) LOGGER.info("Waiting for bot to become ready...") try: await asyncio.wait_for(self._ready_event.wait(), timeout=60.0) LOGGER.info("Bot is ready.") except asyncio.TimeoutError: LOGGER.error("Timeout waiting for bot to become ready.") if self.bot_task and not self.bot_task.done(): self.bot_task.cancel() raise RuntimeError("Bot failed to become ready within timeout.") async def stop_bot(self): async with self.lock: if self.bot: for guild_id in list(self.voice_connections.keys()): await self.leave_voice_channel(guild_id) await self.bot.close() self.bot = None if self.bot_task: self.bot_task.cancel() self.bot_task = None self.voice_connections.clear() self._ready_event.clear() LOGGER.info("Bot has been stopped.") async def join_voice_channel(self, guild_id: int, channel_id: int, ng_threshold: int = 50, device_id: int = 4): if not self.bot: raise RuntimeError("Bot is not running.") guild = self.bot.get_guild(guild_id) if not guild: raise ValueError("Guild not found.") if not opus.is_loaded(): raise RuntimeError("Opus is not loaded.") channel = guild.get_channel(channel_id) if not isinstance(channel, VoiceChannel): raise ValueError("Channel is not a voice channel.") if guild_id in self.voice_connections: raise RuntimeError("Already connected to this guild's voice channel.") try: self._voice_ready_event.clear() voice_client = await channel.connect(timeout=60.0, reconnect=True) await asyncio.wait_for(self._voice_ready_event.wait(), timeout=15.0) audio_manager = AudioStreamManager(input_device_index=device_id) audio_source = NoiseGateSource(audio_manager.get_stream(), threshold=ng_threshold) voice_client.play(audio_source, after=lambda e: LOGGER.error(f'Player error: {e}') if e else None) self.voice_connections[guild_id] = { "client": voice_client, "audio_manager": audio_manager } LOGGER.info(f"Joined guild {guild_id} and started audio stream.") except Exception as e: LOGGER.error(f"Failed to connect to voice channel: {e}", exc_info=True) if guild_id in self.voice_connections: # Cleanup if join fails midway await self.leave_voice_channel(guild_id) raise async def leave_voice_channel(self, guild_id: int): if not self.bot: raise RuntimeError("Bot is not running.") connection_info = self.voice_connections.get(guild_id) if not connection_info: raise RuntimeError("Not connected to the specified guild's voice channel.") voice_client = connection_info.get("client") if voice_client and voice_client.is_connected(): voice_client.stop() await voice_client.disconnect() audio_manager = connection_info.get("audio_manager") if audio_manager: audio_manager.terminate() # Use pop to safely remove the key self.voice_connections.pop(guild_id, None) LOGGER.info(f"Left guild {guild_id} voice channel.") async def load_opus(self): if opus.is_loaded(): LOGGER.info("Opus library is already loaded.") return processor = platform.machine() script_dir = os.path.dirname(os.path.abspath(__file__)) LOGGER.debug(f"Attempting to load Opus. Processor: {processor}, OS: {os.name}") try: if os.name == 'nt': if processor == "AMD64": opus.load_opus(os.path.join(script_dir, './opus/libopus_amd64.dll')) LOGGER.info("Loaded OPUS library for AMD64") else: if processor == "aarch64": opus.load_opus(os.path.join(script_dir, './opus/libopus_aarcch64.so')) LOGGER.info("Loaded OPUS library for aarch64") elif processor == "armv7l": opus.load_opus(os.path.join(script_dir, './opus/libopus_armv7l.so')) LOGGER.info("Loaded OPUS library for armv7l") else: opus.load_opus('libopus.so.0') LOGGER.info(f"Attempted to load system OPUS library for {processor}") except Exception as e: LOGGER.error(f"Failed to load OPUS library: {e}") raise RuntimeError("Could not load a valid Opus library. Voice functionality will fail.") if not opus.is_loaded(): raise RuntimeError("Opus library could not be loaded. Please ensure it is installed correctly.") async def set_presence(self, system_name: str): if not self.bot or not self.bot.is_ready(): LOGGER.warning("Bot is not ready, cannot set presence.") return try: activity = Activity(type=ActivityType.listening, name=system_name) await self.bot.change_presence(activity=activity) LOGGER.info(f"Bot presence set to 'Listening to {system_name}'") except Exception as pe: LOGGER.error(f"Unable to set presence: '{pe}'")