277 lines
8.6 KiB
Python
277 lines
8.6 KiB
Python
import configparser
|
|
import logging
|
|
import os
|
|
from datetime import date
|
|
from os.path import exists
|
|
from NoiseGatev2 import AudioStream
|
|
|
|
# Handler configs
|
|
PDB_ACCEPTABLE_HANDLERS = {'gqrx': {
|
|
'Modes': ['wfm', 'fm']
|
|
},
|
|
'op25': {
|
|
'Modes': ['d', 'p25']
|
|
}}
|
|
|
|
# Known bot IDs
|
|
PDB_KNOWN_BOT_IDS = {756327271597473863: "Greada", 915064996994633729: "Jorn", 943742040255115304: "Brent"}
|
|
|
|
# Default value to set noisegate on new or unknown profiles
|
|
DEFAULT_NOISEGATE_THRESHOLD = 50
|
|
|
|
# Initialize the logger for this file
|
|
LOGGER = logging.getLogger('Discord_Radio_Bot.Bot_Resources')
|
|
|
|
# Location of the gqrx binary
|
|
GQRX_BIN_LOCATION = "/usr/bin/"
|
|
GQRX_BIN = "/usr/bin/gqrx"
|
|
|
|
|
|
# Default radio settings
|
|
DEFAULT_RADIO_SETTINGS = {
|
|
'profile_name': None,
|
|
'freq': "104700000",
|
|
'mode': "wfm",
|
|
'squelch': 0,
|
|
'noisegate_sensitivity': DEFAULT_NOISEGATE_THRESHOLD,
|
|
}
|
|
|
|
|
|
def check_if_config_exists():
|
|
if exists('./config.ini'):
|
|
config = configparser.SafeConfigParser()
|
|
config.read('./config.ini')
|
|
if config.has_section('Bot_Info') and config.has_section('Device') and config.has_section('Config'):
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
|
|
def read_config_file():
|
|
config = configparser.ConfigParser()
|
|
config.read('./config.ini')
|
|
|
|
try:
|
|
config_return = {
|
|
'Bot Token': config['Bot_Info']['Token'],
|
|
'Device ID': int(config['Device']['ID']),
|
|
'Device Name': str(config['Device']['Name']),
|
|
'Mention Group': str(config['Bot_Info']['Mention_Group']),
|
|
'Channel ID': int(config['Bot_Info']['Channel_ID']),
|
|
'Handler': str(config['Config']['Handler'])
|
|
}
|
|
|
|
LOGGER.debug("Found config options:")
|
|
for key in config_return.keys():
|
|
LOGGER.debug(f"\t{key} : {config_return[key]}")
|
|
|
|
return config_return
|
|
except Exception as err:
|
|
LOGGER.warning(err)
|
|
return None
|
|
|
|
|
|
def write_config_file(**kwargs):
|
|
config = configparser.SafeConfigParser()
|
|
|
|
if not kwargs['init'] and exists('./config.ini'):
|
|
config.read('./config.ini')
|
|
|
|
if not config.has_section('Bot_Info'):
|
|
config.add_section('Bot_Info')
|
|
|
|
if not config.has_section('Config'):
|
|
config.add_section('Config')
|
|
|
|
if not config.has_section('Device'):
|
|
config.add_section('Device')
|
|
|
|
if 'handler' in kwargs.keys():
|
|
config['Config']['Handler'] = str(kwargs['handler'])
|
|
elif kwargs['init']:
|
|
config['Config']['Handler'] = str(get_handler())
|
|
|
|
if 'token' in kwargs.keys():
|
|
config['Bot_Info']['Token'] = str(kwargs['token'])
|
|
elif kwargs['init']:
|
|
config['Bot_Info']['Token'] = str(get_user_token())
|
|
|
|
if 'device_id' in kwargs.keys() or 'device_name' in kwargs.keys():
|
|
config['Device']['ID'] = str(kwargs['device_id'])
|
|
config['Device']['Name'] = str(kwargs['device_name'])
|
|
elif kwargs['init']:
|
|
device_id, device_name = get_user_device_selection()
|
|
config['Device']['ID'] = str(device_id)
|
|
config['Device']['Name'] = str(device_name)
|
|
|
|
if 'mention_group' in kwargs.keys():
|
|
config['Bot_Info']['Mention_Group'] = str(kwargs['mention_group'])
|
|
elif kwargs['init']:
|
|
config['Bot_Info']['Mention_Group'] = str(get_user_mention_group())
|
|
|
|
if 'channel_id' in kwargs.keys():
|
|
config['Bot_Info']['Channel_ID'] = str(kwargs['channel_id'])
|
|
elif kwargs['init']:
|
|
config['Bot_Info']['Channel_ID'] = str(get_user_mention_channel_id())
|
|
|
|
with open('./config.ini', 'w') as config_file:
|
|
config.write(config_file)
|
|
|
|
LOGGER.info("Config Saved")
|
|
|
|
return True
|
|
|
|
|
|
def get_device_list():
|
|
list_of_devices = AudioStream().list_devices()
|
|
LOGGER.debug(list_of_devices)
|
|
return list_of_devices
|
|
|
|
|
|
def get_user_device_selection():
|
|
device_list = get_device_list()
|
|
org_device_list = []
|
|
for device, dev_id in device_list:
|
|
LOGGER.debug(f"{dev_id + 1}\t-\t{device}")
|
|
org_device_list.append((dev_id, device))
|
|
selected_list_id = None
|
|
selected_device = None
|
|
while not selected_list_id:
|
|
LOGGER.debug(f"selected device: {selected_list_id}")
|
|
LOGGER.debug(device_list)
|
|
|
|
try:
|
|
selected_list_id = int(input(f"Please select the input device from above:\t"))
|
|
|
|
except Exception as e:
|
|
LOGGER.warning(e)
|
|
continue
|
|
|
|
if int(selected_list_id) < int(len(device_list)):
|
|
LOGGER.debug("Selected ID within range")
|
|
continue
|
|
elif selected_list_id > int(len(device_list)):
|
|
LOGGER.debug("Out of range, try again...")
|
|
selected_list_id = None
|
|
continue
|
|
else:
|
|
selected_list_id = None
|
|
LOGGER.error("Internal error, try again")
|
|
continue
|
|
|
|
for dev_dict in org_device_list:
|
|
LOGGER.debug(list(device_list)[selected_list_id-1][0])
|
|
if dev_dict[1] == list(device_list)[selected_list_id-1][0]:
|
|
selected_device = dev_dict
|
|
LOGGER.debug(selected_device)
|
|
|
|
return selected_device
|
|
|
|
|
|
def get_user_token():
|
|
token = None
|
|
while not token:
|
|
token = str(input(f"Please enter your Discord bot API token now:\t"))
|
|
if len(token) == 59:
|
|
return token
|
|
else:
|
|
LOGGER.error('Length error in token, please try again...')
|
|
token = None
|
|
continue
|
|
|
|
|
|
def get_user_mention_group():
|
|
mention_group = None
|
|
while not mention_group:
|
|
mention_group = str(input(f"Please enter the name of the group you would like to mention:\t"))
|
|
return mention_group
|
|
|
|
|
|
def get_user_mention_channel_id():
|
|
channel_id = None
|
|
while not channel_id:
|
|
channel_id = int(input(f"Please enter the channel ID of the the default channel you would like messages to be sent in:\t"))
|
|
if len(str(channel_id)) == len('757379843792044102'):
|
|
return channel_id
|
|
else:
|
|
LOGGER.error("Length error in ID, please try again")
|
|
channel_id = None
|
|
continue
|
|
|
|
|
|
def get_handler():
|
|
handler = None
|
|
while not handler:
|
|
handler = str(input(f"Please enter the name of the handler you would like to use:\t"))
|
|
if handler in PDB_ACCEPTABLE_HANDLERS:
|
|
return handler
|
|
elif handler == ['', "none"]:
|
|
handler = "None"
|
|
return handler
|
|
|
|
|
|
def check_negative(s):
|
|
try:
|
|
f = float(s)
|
|
if (f < 0):
|
|
return True
|
|
# Otherwise return false
|
|
return False
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
# Check if message is a ping request and respond even if it is a bot
|
|
async def check_and_reply_to_ping(bot, message):
|
|
if "check_modules" in message.content:
|
|
ctx = await bot.get_context(message)
|
|
await bot.invoke(ctx)
|
|
return True
|
|
else:
|
|
await bot.process_commands(message)
|
|
return False
|
|
|
|
|
|
# Create the logger
|
|
def init_global_logger(_verbose_level: str = "WARNING"):
|
|
numeric_log_level = getattr(logging, _verbose_level.upper(), None)
|
|
if not isinstance(numeric_log_level, int):
|
|
raise ValueError('Invalid log level: %s' % _verbose_level)
|
|
else:
|
|
# create logger
|
|
init_logger = logging.getLogger('Discord_Radio_Bot')
|
|
init_logger.setLevel(logging.DEBUG)
|
|
|
|
# create file handler which logs even debug messages
|
|
if not exists("./logs/"):
|
|
os.mkdir("./logs/")
|
|
fh = logging.FileHandler(f'./logs/DRB-{date.today()}.log')
|
|
fh.setLevel(logging.INFO)
|
|
|
|
fh_debug = logging.FileHandler(f'./logs/DRB-{date.today()}.DEBUG.log')
|
|
fh_debug.setLevel(logging.DEBUG)
|
|
|
|
# create terminal handler with a higher log level
|
|
th = logging.StreamHandler()
|
|
th.setLevel(numeric_log_level)
|
|
|
|
# create formatter and add it to the handlers
|
|
fh_debug_formatter = logging.Formatter('[%(asctime)s %(name)s->%(funcName)s():%(lineno)s] - %(levelname)s - %(message)s')
|
|
fh_formatter = logging.Formatter('[%(asctime)s %(name)s->%(funcName)s()] - %(levelname)s - %(message)s')
|
|
th_formatter = logging.Formatter('[%(asctime)s %(name)s->%(funcName)s()] - %(levelname)s - %(message)s')
|
|
fh_debug.setFormatter(fh_debug_formatter)
|
|
fh.setFormatter(fh_formatter)
|
|
th.setFormatter(th_formatter)
|
|
|
|
# add the handlers to the logger
|
|
init_logger.addHandler(fh)
|
|
init_logger.addHandler(th)
|
|
init_logger.addHandler(fh_debug)
|
|
|
|
|
|
#if __name__ is not 'main':
|
|
# init_global_logger()
|
|
# LOGGER = logging.getLogger('Discord_Radio_Bot')
|