import asyncio import platform import os from discord import VoiceClient, VoiceChannel, opus, Activity, ActivityType, Intents from discord.ext import commands from typing import Optional, Dict from internal.NoiseGatev2 import NoiseGate from internal.logger import create_logger LOGGER = create_logger(__name__) # Configure discord intents intents = Intents.default() intents.voice_states = True intents.guilds = True class DiscordBotManager: def __init__(self): self.bot: Optional[commands.Bot] = None self.bot_task: Optional[asyncio.Task] = None self.voice_clients: Dict[int, VoiceClient] = {} self.token: Optional[str] = None self.loop = asyncio.get_event_loop() self.lock = asyncio.Lock() self._ready_event = asyncio.Event() self._voice_ready_event = asyncio.Event() async def start_bot(self, token: str): async with self.lock: if self.bot and not self.bot.is_closed(): raise RuntimeError("Bot is already running.") if self.bot_task and not self.bot_task.done(): raise RuntimeError("Bot is already running.") self.token = token self.bot = commands.Bot(command_prefix="!", intents=intents) @self.bot.event async def on_ready(): LOGGER.info(f'Logged in as {self.bot.user}') # Set the event when on_ready is called self._ready_event.set() @self.bot.event async def on_voice_state_update(member, before, after): # Check if the bot was disconnected if member == self.bot.user and after.channel is None: guild_id = before.channel.guild.id if not self.voice_clients.get(guild_id): LOGGER.info("Bot has left channel, reconnection ignored.") return LOGGER.info(f"Bot was disconnected from channel in guild {guild_id}. Attempting to reconnect...") try: await self.leave_voice_channel(guild_id) except Exception as e: LOGGER.warning(f"Error leaving voice channel: '{e}'") # Attempt to reconnect to the channel after a brief pause await asyncio.sleep(2) await self.join_voice_channel(guild_id, before.channel.id) if member == self.bot.user and before.channel is None and after.channel is not None: print(f"{member.name} joined voice channel {after.channel.name}") self._voice_ready_event.set() # Load Opus for the current CPU await self.load_opus() # Create the task to run the bot in the background self.bot_task = self.loop.create_task(self.bot.start(token)) # Wait for the on_ready event to be set by the bot task LOGGER.info("Waiting for bot to become ready...") try: await asyncio.wait_for(self._ready_event.wait(), timeout=60.0) LOGGER.info("Bot is ready, start_bot returning.") return except asyncio.TimeoutError: LOGGER.error("Timeout waiting for bot to become ready. Bot might have failed to start.") if self.bot_task and not self.bot_task.done(): self.bot_task.cancel() raise RuntimeError("Bot failed to become ready within timeout.") async def stop_bot(self): async with self.lock: if self.bot: await self.bot.close() self.bot = None if self.bot_task: await self.bot_task self.bot_task = None self.voice_clients.clear() self._ready_event.clear() LOGGER.info("Bot has been stopped.") async def join_voice_channel(self, guild_id: int, channel_id: int, ng_threshold: int = 50, device_id: int = 4): if not self.bot: raise RuntimeError("Bot is not running.") guild = self.bot.get_guild(guild_id) if not guild: raise ValueError("Guild not found.") if not opus.is_loaded(): raise RuntimeError("Opus is not loaded.") channel = guild.get_channel(channel_id) if not isinstance(channel, VoiceChannel): raise ValueError("Channel is not a voice channel.") if guild_id in self.voice_clients: raise RuntimeError("Already connected to this guild's voice channel.") try: voice_client = await channel.connect(timeout=60.0, reconnect=True) LOGGER.debug("Voice Connected.") streamHandler = NoiseGate( _input_device_index=device_id, _voice_connection=voice_client, _noise_gate_threshold=ng_threshold) streamHandler.run() LOGGER.debug("Stream is running.") self.voice_clients[guild_id] = voice_client LOGGER.info(f"Joined guild {guild_id} voice channel {channel_id} and stream is running.") except Exception as e: LOGGER.error(f"Failed to connect to voice channel: {e}") LOGGER.info("Waiting for bot to join voice...") try: await asyncio.wait_for(self._voice_ready_event.wait(), timeout=60.0) LOGGER.info("Bot joined voice, returning.") return except asyncio.TimeoutError: LOGGER.error("Timeout waiting for bot to join voice.") raise RuntimeError("Bot failed to join voice within timeout.") async def leave_voice_channel(self, guild_id: int): if not self.bot: raise RuntimeError("Bot is not running.") voice_client = self.voice_clients.get(guild_id) if not voice_client: raise RuntimeError("Not connected to the specified guild's voice channel.") await voice_client.disconnect() del self.voice_clients[guild_id] LOGGER.info(f"Left guild {guild_id} voice channel.") async def load_opus(self): """ Load the proper OPUS library for the device being used """ processor = platform.machine() script_dir = os.path.dirname(os.path.abspath(__file__)) LOGGER.debug("Processor: ", processor) if os.name == 'nt': if processor == "AMD64": opus.load_opus(os.path.join(script_dir, './opus/libopus_amd64.dll')) LOGGER.info("Loaded OPUS library for AMD64") return "AMD64" else: if processor == "aarch64": opus.load_opus(os.path.join(script_dir, './opus/libopus_aarcch64.so')) LOGGER.info("Loaded OPUS library for aarch64") return "aarch64" elif processor == "armv7l": opus.load_opus(os.path.join(script_dir, './opus/libopus_armv7l.so')) LOGGER.info("Loaded OPUS library for armv7l") return "armv7l" async def set_presence(self, system_name: str): """ Set the presence (activity) of the bot """ if not self.bot: LOGGER.warning("Bot is not running, cannot set presence.") return try: activity = Activity(type=ActivityType.listening, name=system_name) await self.bot.change_presence(activity=activity) LOGGER.info(f"Bot presence set to 'Listening to {system_name}'") except Exception as pe: LOGGER.error(f"Unable to set presence: '{pe}'")