import asyncio import logging import os import platform import discord import BotResources import configparser import NoiseGatev2 from discord.ext import commands # Init class for bot class Bot(commands.Bot): def __init__(self, **kwargs): # If there is no custom command prefix (!help, ?help, etc.), use '>!' but also accept @ mentions if 'command_prefix' not in kwargs.keys(): bot_intents = set_server_intents() kwargs['command_prefix'] = '>!' commands.Bot.__init__(self, command_prefix=commands.when_mentioned_or(kwargs['command_prefix']), activity=discord.Game(name=f"@ me"), status=discord.Status.idle, intents=bot_intents) # Create the logger for the bot self.logger = logging.getLogger("Discord_Radio_Bot.Bot") # Init the core bot variables self.DEVICE_ID = kwargs['Device_ID'] self.DEVICE_NAME = kwargs['Device_Name'] self.BOT_TOKEN = kwargs['Token'] self.Default_Channel_ID = kwargs['Channel_ID'] self.Default_Mention_Group = kwargs['Mention_Group'] self.Handler = kwargs['Handler'] self.Command_Prefix = kwargs['command_prefix'] # Init Variable for sound self.streamHandler = None # Init 'lock' variable for when the bot is joined self.Bot_Connected = False # Init the audio devices list #self.Devices_List = sound.query_devices().items() self.Devices_List = NoiseGatev2.AudioStream().list_devices(_display_input_devices=False, _display_output_devices=False) # Init radio parameters self.profile_name = BotResources.DEFAULT_RADIO_SETTINGS['profile_name'] self.freq = BotResources.DEFAULT_RADIO_SETTINGS['freq'] self.mode = BotResources.DEFAULT_RADIO_SETTINGS['mode'] self.squelch = BotResources.DEFAULT_RADIO_SETTINGS['squelch'] self.noisegate_sensitivity = BotResources.DEFAULT_RADIO_SETTINGS['noisegate_sensitivity'] # Init SDR Variables self.system_os_type = None self.sdr_started = False # Check the handler being used self.check_handler() # Set linux or windows self.check_os_type() # Add discord commands to the bot self.add_commands() # Add discord events to the bot self.add_events() # Start the bot def start_bot(self): self.run(self.BOT_TOKEN) # Add discord commands to the bot def add_commands(self): # Command to display what channel the bot is in @self.command(help="Use this command to display what channel the bot is in", brief="Where ya at?") async def wya(ctx, member: discord.Member = None): member = member or ctx.author.display_name if self.Bot_Connected: await ctx.send(f"Hey {str(member).capitalize()}, I'm in {ctx.voice_client.channel}") else: await ctx.send(f"{str(member).capitalize()}, I am not in any channel.") # Command to join the bot the voice channel the user who called the command is in @self.command(help="Use this command to join the bot to your channel", brief="Joins the voice channel that the caller is in") async def join(ctx, *, member: discord.Member = None): member = member or ctx.author.display_name self.logger.info(f"Join requested by {member}") if not self.Bot_Connected: # Wait for the bot to be ready to connect await self.wait_until_ready() # Load respective opus library self.load_opus() if discord.opus.is_loaded(): channel = ctx.author.voice.channel self.logger.debug("Sending hello") await ctx.send(f"Ok {str(member).capitalize()}, I'm joining {channel}") # Join the voice channel with the audio stream self.logger.debug('Joining') voice_connection = await channel.connect() # Create an audio stream from selected device self.logger.debug("Starting noisegate/stream handler") self.streamHandler = NoiseGatev2.NoiseGate(_input_device_index=self.DEVICE_ID, _voice_connection=voice_connection, _noise_gate_threshold=self.noisegate_sensitivity) # Start the audio stream self.streamHandler.run() # Start the SDR and begin playing to the audio stream self.logger.debug("Starting SDR") self.start_sdr() # Change the activity to the channel and band-type being used self.logger.debug("Changing presence") await self.set_activity() # 'Lock' the bot from connecting self.logger.debug("Locking the bot") self.Bot_Connected = True else: # Return that the opus library would not load await ctx.send("Opus won't load") self.logger.critical("OPUS didn't load properly") else: await ctx.send(f"{str(member).capitalize()}, I'm already connected") self.logger.info("Bot is already in a channel") @self.command(help="Use this command to have the bot leave your channel", brief="Leaves the current voice channel") async def leave(ctx, member: discord.Member = None): member = member or ctx.author.display_name self.logger.info(f"Leave requested by {member}") if self.Bot_Connected: # Stop the sound handlers # Disconnect the client from the voice channel self.logger.debug("Disconnecting") await self.streamHandler.close() self.logger.debug("Changing presence") # Change the presence to away and '@ me' await self.set_activity(False) # Stop the SDR so it can cool off self.logger.debug("Stopping SDR") self.stop_sdr() self.logger.debug("Unlocking the bot") # 'Unlock' the bot self.Bot_Connected = False self.logger.debug("Sending Goodbye") await ctx.send(f"Goodbye {str(member).capitalize()}.") else: await ctx.send(f"{str(member).capitalize()}, I'm not in a channel") self.logger.info("Bot is not in a channel") # Add command to change the NoiseGate threshold @self.command(help="Use this command to change the threshold of the noisegate", brief="Change noisegate sensitivity") async def chngth(ctx, _threshold: int, member: discord.Member = None): member = member or ctx.author.display_name self.logger.info(f"Change of NoiseGate threshold requested by {member}") await ctx.send(f"Ok {str(member).capitalize()}, I'm changing the threshold from " f"{self.streamHandler.THRESHOLD} to {_threshold}") self.streamHandler.THRESHOLD = _threshold self.noisegate_sensitivity = _threshold # Reset the profile name since we have made a change self.profile_name = None # If the SDR is started, restart it with the updates await self.use_current_radio_config() # Add commands for GQRX and OP25 if self.Handler in BotResources.PDB_ACCEPTABLE_HANDLERS.keys(): # Command to display the current config @self.command(name='displaycurprofile', help="Use this command to display the current configuration of the bot.\n" "Example command:\n" "\t@ displaycurprofile", breif="Display current bot config") async def _displaycurprofile(ctx, member: discord.Member = None): member = member or ctx.author.display_name message = self.display_current_radio_config() await ctx.send(f"Ok {str(member).capitalize()},\n{message}") self.logger.info(f"Displaying current profile; requested by {member}") # Command to display the current config @self.command(name='displayprofiles', help="Use this command to display the saved profiles.\n" "Example command:\n" "\t@ displayprofiles", breif="Display current bot config") async def _displayprofiles(ctx, member: discord.Member = None): member = member or ctx.author.display_name message = self.display_saved_radio_configs() await ctx.send(f"Ok {str(member).capitalize()},\n{message}") self.logger.info(f"Displaying all profiles; requested by {member}") # Command to change the current frequency and mode @self.command(name='chfreq', help="Use this command to change the frequency the bot is listening to.\n" "Example GQRX command:\n" "\tTune to 104.7Mhz Wideband FM (Radio) - '@ chfreq wfm 104700000\n" "\tTune to 155.505Mhz Narrowband FM (Radio) - '@ chfreq fm 155505000\n" "Example OP25 command:\n" "\tTune to 155.310Mhz, decode using P25 - '@ chfreq p25 155.310", brief="Changes radio frequency") async def chfreq(ctx, mode: str, freq: str, member: discord.Member = None): # Possible band-types that can be used member = member or ctx.author.display_name self.logger.info(f"{member} requested change of frequency to Mode: {mode}, Freq: {freq}") # Check to make sure the frequency input matches the syntax needed if len(freq) >= 6: self.freq = freq # Check to make sure the selected mode is valid if mode in self.possible_modes: self.mode = mode await ctx.send(f"Ok {str(member).capitalize()}, I'm changing the mode to " f"{str(self.mode).upper()} and frequency to {self.freq}") # Reset the profile name since we have made a change self.profile_name = None # If the SDR is started, restart it with the updates await self.use_current_radio_config() else: await ctx.send(f"{str(member).capitalize()}, {mode} is not valid." f" You may only enter {self.possible_modes}") else: await ctx.send(f"{str(member).capitalize()}, {freq} is not valid. " f"Please refer to the help page '@ help chfreq'") # GQRX Specific commands if self.Handler == 'gqrx': @self.command(name='chsquelch', help="Use this command to change the squelch for the frequency " "the bot is listening to\n" "Example Commands:\n" "\tNo Squelch\t'@ chsquelch 150'\n" "\tFully Squelched\t'@ chsquelch 0'", brief="Changes radio squelch") async def chsquelch(ctx, squelch: float, member: discord.Member = None): member = member or ctx.author.display_name self.logger.info(f"{member} requested change of squelch to: {squelch}") self.squelch = squelch await ctx.send(f"Ok {str(member).capitalize()}, I'm changing the squelch to {self.squelch}") # Reset the profile name since we have made a change self.profile_name = None # If the SDR is started, restart it with the updates await self.use_current_radio_config() # Hidden admin commands @self.command(name='saveprofile', hidden=True) async def _saveprofile(ctx, profile_name: str, member: discord.Member = None): member = member or ctx.author.display_name await self.save_radio_config(profile_name) await ctx.send(f"Ok {str(member).capitalize()}, I saved the current settings as {profile_name}") @self.command(name='loadprofile', hidden=True) async def _loadprofile(ctx, profile_name: str, member: discord.Member = None): member = member or ctx.author.display_name config_loaded = await self.load_radio_config(profile_name) if config_loaded: await ctx.send(f"Ok {str(member).capitalize()}, I loaded the settings saved as {profile_name}") else: await ctx.send(f"{str(member).capitalize()}, there is no profile with the name '{profile_name}'") # Hidden admin commands @self.command(name='reload', hidden=True) async def _reload(ctx, module: str, member: discord.Member = None): """Reloads a module.""" member = member or ctx.author.display_name if self.reload_modules(module): await ctx.send(f"Ok {str(member).capitalize()}, I reloaded {module}") else: await ctx.send(f"{str(member).capitalize()}, something went wrong. Please check the console") @self.command(name='startsdr', hidden=True) async def _startsdr(*args): self.start_sdr() @self.command(name='stopsdr', hidden=True) async def _stopsdr(*args): self.stop_sdr() # Add discord events to the bot def add_events(self): # Run any functions that need to have the bot running to complete @self.event async def on_ready(): # Check the ./modules folder for any modules (cog.py) await self.check_for_modules() self.logger.info("Bot started!") # Check to see if other bots are online async def check_other_bots_online(self): self.logger.info('Checking if other bots are online') channel = self.get_channel(self.Default_Channel_ID) self.logger.debug(f"Testing in: {channel}") bots_online = [] def verify_bot_msg(msg): if msg.author.id in BotResources.PDB_KNOWN_BOT_IDS.keys(): bots_online.append(BotResources.PDB_KNOWN_BOT_IDS[msg.author.id]) await self.wait_until_ready() # Send the ping command with the prefix the current bot is using await channel.send(f"{self.Command_Prefix}check_modules") seconds_waited = 0 while seconds_waited < 3: try: await self.wait_for("message", check=verify_bot_msg, timeout=1) except asyncio.exceptions.TimeoutError: seconds_waited += 1 self.logger.debug(f"Bots Online: {bots_online}") if len(bots_online) == 0: return False elif len(bots_online) > 0: return True # Check the handler being used during init def check_handler(self): if self.Handler == "gqrx": self.logger.info("Starting GQRX handler") from gqrxHandler import GQRXHandler self.GQRXHandler = GQRXHandler() self.GQRXHandler.start() self.possible_modes = BotResources.PDB_ACCEPTABLE_HANDLERS['gqrx']['Modes'] elif self.Handler == 'op25': self.logger.info("Starting OP25 handler") from op25Handler import OP25Handler self.OP25Handler = OP25Handler() self.OP25Handler.start() self.possible_modes = BotResources.PDB_ACCEPTABLE_HANDLERS['op25']['Modes'] # Load the proper OPUS library for the device being used def load_opus(self): # Check the system type and load the correct library # Linux ARM AARCH64 running 32bit OS if self.system_os_type == 'Linux_ARMv7l': self.logger.debug(f"Loaded OPUS library for {self.system_os_type}") discord.opus.load_opus('./opus/libopus_armv7l.so') # Linux ARM AARCH64 running 64bit OS if self.system_os_type == 'Linux_AARCH64': self.logger.debug(f"Loaded OPUS library for {self.system_os_type}") discord.opus.load_opus('./opus/libopus_aarcch64.so') # Windows 64bit if self.system_os_type == 'Windows_x64': self.logger.debug(f"Loaded OPUS library for {self.system_os_type}") discord.opus.load_opus('./opus/libopus_amd64.dll') # Check to make sure the selected device is still available and has not changed its index def check_device(self, _override): # Check to see if an override has been passed if not _override: self.logger.debug(f"Device list {self.Devices_List}") for device, index in self.Devices_List['Input']: if int(index) == self.DEVICE_ID and str(device) == self.DEVICE_NAME: return True for device, index in self.Devices_List['Input']: if str(device) == self.DEVICE_NAME: self.DEVICE_ID = int(index) return True else: return False else: # If an override has been passed just reply true return True # Search the ./modules folder for any modules to load async def check_for_modules(self): # Check to see if other bots are online and don't load the modules if they are if not await self.check_other_bots_online(): # A valid module must be built as a 'cog', refer to the docs for more information for folder_name in os.listdir("modules"): if str(folder_name)[0] == '.': continue elif os.path.exists(os.path.join("modules", folder_name, "cog.py")): self.logger.debug(f"Loaded extension: {folder_name}") self.load_extension(f"modules.{folder_name}.cog") # Reload a selected module for changes def reload_modules(self, module): try: self.unload_extension(f"modules.{module}.cog") self.logger.debug(f"Unloaded {module}") self.load_extension(f"modules.{module}.cog") self.logger.debug(f"Loaded {module}") return True except Exception as e: self.logger.error(e) return False # Check and store the OS type of the system for later use def check_os_type(self): processor = platform.machine() if os.name == 'nt': if processor == "AMD64": self.system_os_type = 'Windows_x64' self.logger.debug(f"OS/Arch is {self.system_os_type}") else: if processor == "aarch64": self.system_os_type = 'Linux_AARCH64' self.logger.debug(f"OS/Arch is {self.system_os_type}") elif processor == "armv7l": self.system_os_type = 'Linux_ARMv7l' self.logger.debug(f"OS/Arch is {self.system_os_type}") # Check to see if there is only one frequency def start_sdr(self): if self.Handler in BotResources.PDB_ACCEPTABLE_HANDLERS.keys(): if type(self.freq) == str: # Single freq sent # Stop the SDR if it is running self.stop_sdr() # Start the radio self.logger.debug(f"Starting freq: {self.freq}") if self.Handler == 'gqrx': # Set the settings in GQRX self.GQRXHandler.set_gqrx_parameters(_frequency=self.freq, _squelch=self.squelch, _fm_mode=self.mode, _output_device_name=self.DEVICE_NAME, _start=True) elif self.Handler == 'op25': self.OP25Handler.set_op25_parameters(self.freq, _start=True, _output_device_name=self.DEVICE_NAME) # Set the started variable for later checks self.sdr_started = True # Check to see if the SDR is running def stop_sdr(self): if self.sdr_started: # Wait for the running processes to close # Close the GQRX handler if self.Handler == 'gqrx': self.GQRXHandler.set_gqrx_parameters(_stop=True) # Close the OP25 handler if self.Handler == 'op25': self.OP25Handler.set_op25_parameters(_stop=True) # self.OP25Handler.join() self.sdr_started = False # Set the activity of the bot async def set_activity(self, connected=True): if connected: if self.Handler in BotResources.PDB_ACCEPTABLE_HANDLERS.keys(): if self.profile_name is None: await self.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f"{self.freq[:-1]}" f" {str(self.mode).upper()}"), status=discord.Status.online) elif type(self.profile_name) == str: await self.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f"{str(self.profile_name).upper()}"), status=discord.Status.online) else: await self.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f"the airwaves"), status=discord.Status.online) elif not connected: await self.change_presence(activity=discord.Game(name=f"@ me"), status=discord.Status.idle) # Save the current radio settings as a profile async def save_radio_config(self, _profile_name: str): self.logger.debug(f"Saving profile {_profile_name}") config = configparser.SafeConfigParser() if os.path.exists('./profiles.ini'): config.read('./profiles.ini') profile_name = str(_profile_name).upper() if not config.has_section(str(profile_name)): config.add_section(str(profile_name)) config[str(profile_name)]['Frequency'] = self.freq config[str(profile_name)]['Mode'] = self.mode config[str(profile_name)]['Squelch'] = str(self.squelch) config[str(profile_name)]['Noisegate_Sensitivity'] = str(self.noisegate_sensitivity) with open('./profiles.ini', 'w+') as config_file: config.write(config_file) self.profile_name = profile_name await self.use_current_radio_config() async def use_current_radio_config(self): if self.sdr_started: # Set the loaded profile settings into GQRX if self.Handler == "gqrx": self.GQRXHandler.set_gqrx_parameters(_frequency=self.freq, _squelch=self.squelch, _fm_mode=self.mode) # Restart OP25 to use the loaded profile if self.Handler == "op25": self.start_sdr() # Set the activity to reflect the loaded profile await self.set_activity() # Load a saved profile into the current settings async def load_radio_config(self, profile_name): self.logger.debug(f"Loading profile {profile_name}") config = configparser.ConfigParser() if os.path.exists('./profiles.ini'): config.read('./profiles.ini') if config.has_section(str(profile_name).upper()): self.profile_name = str(profile_name).upper() self.freq = config[self.profile_name]['Frequency'] self.mode = config[self.profile_name]['Mode'] self.squelch = float(config[self.profile_name]['Squelch']) try: self.noisegate_sensitivity = int(config[self.profile_name]['Noisegate_Sensitivity']) except KeyError: self.logger.warning(f"Config does not contain a 'noisegate sensitivity' value, " f"creating one now with the default value: " f"{BotResources.DEFAULT_NOISEGATE_THRESHOLD}") self.noisegate_sensitivity = BotResources.DEFAULT_NOISEGATE_THRESHOLD await self.save_radio_config(self.profile_name) await self.use_current_radio_config() return True else: return False else: return False def display_current_radio_config(self): message_body = "" if self.profile_name: message_body += f"Profile Name: {str(self.profile_name).upper()}\n" message_body += f"\tMode:\t\t\t\t\t{self.mode}\n" \ f"\tFrequency:\t\t\t{self.freq}\n" \ f"\tNoisegate Sensitivity:\t{self.noisegate_sensitivity}" if self.squelch: message_body += f"\tSquelch:\t\t\t\t{self.squelch}" return message_body def display_saved_radio_configs(self): message_body = f"Saved configs\n" config = configparser.ConfigParser() if os.path.exists('./profiles.ini'): config.read('./profiles.ini') for section in config.sections(): message_body += f"\nProfile Name: {section}:\n" \ f"\tMode:\t\t\t\t\t{config[section]['Mode']}\n" \ f"\tFrequency:\t\t\t{config[section]['Frequency']}\n" try: message_body += f"\tNoisegate Sensitivity:\t{config[section]['Noisegate_Sensitivity']}\n" except KeyError: self.logger.warning(f"Config does not contain a 'noisegate sensitivity' value. Please load the profile") message_body += f"\tSquelch:\t\t\t\t{config[section]['Squelch']}\n" return message_body # Set discord intents and return the intent object def set_server_intents(): bot_intents = discord.Intents.default() #bot_intents.messages = True #bot_intents.message_content = True #bot_intents.members = True return bot_intents