Files
Discord-Radio-Bot/bot.py
Logan Cusano eac3fc4e39 Update: Working on SDR RX
- Switched to using GQRX
- Created a handler to connect over Telnet to local GQRS

TODO
- Need to find a way to close GQRX without closing the application; Can't start the app with the radio listening, so it must be opened manually
2022-01-03 22:54:18 -05:00

322 lines
13 KiB
Python

import os
import re
import time
import discord
import sound
import configparser
from discord.ext import commands
from subprocess import Popen, PIPE
from gqrxHandler import GQRXHandler
from BotResources import check_negative
# Init class for bot
class Bot(commands.Bot):
def __init__(self, **kwargs):
# If there is no custom command prefix (!help, ?help, etc.), use '>!' but also accept @ mentions
if 'command_prefix' not in kwargs.keys():
kwargs['command_prefix'] = '>!'
commands.Bot.__init__(self, command_prefix=commands.when_mentioned_or(kwargs['command_prefix']),
activity=discord.Game(name=f"@ me"), status=discord.Status.idle)
# Init the core bot variables
self.DEVICE_ID = kwargs['Device_ID']
self.DEVICE_NAME = kwargs['Device_Name']
self.BOT_TOKEN = kwargs['Token']
self.Default_Channel_ID = kwargs['Channel_ID']
self.Default_Mention_Group = kwargs['Mention_Group']
# Init the audio devices list
self.Devices_List = sound.query_devices().items()
# Init radio parameters
self.profile_name = None
self.freq = "104700000"
self.mode = "wbfm"
self.squelch = 0
# Init SDR Variables
self.system_os_type = None
self.sdr_started = False
self.GQRXHandler = GQRXHandler()
# Set linux or windows
self.check_os_type()
# Add discord commands to the bot
self.add_commands()
# Check the ./modules folder for any modules (cog.py)
self.check_for_modules()
# Start the bot
def start_bot(self):
self.run(self.BOT_TOKEN)
# Add discord commands to the bot
def add_commands(self):
# Test command to see if the bot is on (help command can also be used)
@self.command(help="Use this to test if the bot is alive", brief="Sends a 'pong' in response")
async def ping(ctx):
await ctx.send('pong')
# Command to join the bot the voice channel the user who called the command is in
@self.command(help="Use this command to join the bot to your channel",
brief="Joins the voice channel that the caller is in")
async def join(ctx, *, member: discord.Member = None):
member = member or ctx.author.display_name
# Wait for the bot to be ready to connect
await self.wait_until_ready()
# Load respective opus library
self.load_opus()
if discord.opus.is_loaded():
# Create an audio stream from selected device
stream = sound.PCMStream()
channel = ctx.author.voice.channel
await ctx.send(f"Ok {str(member).capitalize()}, I'm joining {channel}")
# Ensure the selected device is available and start the audio stream
stream.change_device(self.DEVICE_ID)
# Join the voice channel with the audio stream
voice_connection = await channel.connect()
voice_connection.play(discord.PCMAudio(stream))
# Start the SDR and begin playing to the audio stream
self.start_sdr()
# Change the activity to the channel and band-type being used
await self.set_activity()
else:
# Return that the opus library would not load
await ctx.send("Opus won't load")
@self.command(help="Use this command to have the bot leave your channel",
brief="Leaves the current voice channel")
async def leave(ctx):
# Disconnect the client from the voice channel
await ctx.voice_client.disconnect()
# Change the presence to away and '@ me'
await self.set_activity(False)
# Stop the SDR so it can cool off
self.stop_sdr()
@self.command(name='chfreq', help="Use this command to change the frequency the bot is listening to. Note: 'M'"
"is required\nExample command: '@ chfreq wbfm 104.7M\n"
"Example command: '@ chfreq fm 154.785M",
brief="Changes radio frequency")
async def chfreq(ctx, mode: str, freq: str, member: discord.Member = None):
# Possible band-types that can be used
possible_modes = ['wbfm', 'fm']
member = member or ctx.author.display_name
# Check to make sure the frequency input matches the syntax needed
if len(freq) >= 6:
self.freq = freq
# Check to make sure the selected mode is valid
if mode in possible_modes:
self.mode = mode
await ctx.send(f"Ok {str(member).capitalize()}, I'm changing the mode to {str(self.mode).upper()} and frequency to"
f" {self.freq}")
# Reset the profile name since we have made a change to the freq
self.profile_name = None
# If the SDR is started, restart it with the updates
if self.sdr_started:
self.start_sdr()
await self.set_activity()
else:
await ctx.send(f"{str(member).capitalize()}, {mode} is not valid. You may only enter 'fm' or 'wbfm'")
else:
await ctx.send(f"{str(member).capitalize()}, {freq} is not valid. please refer to the help page '@ help chfreq'")
@self.command(name='chsquelch', help="Use this command to change the squelch for the frequency"
"the bot is listening to",
brief="Changes radio squelch")
async def chsquelch(ctx, squelch: float, member: discord.Member = None):
member = member or ctx.author.display_name
print(f"Squelch = {squelch}")
if not check_negative(squelch):
squelch = float(-abs(squelch))
print(f"Squelch = {squelch}")
self.squelch = squelch
await ctx.send(f"Ok {str(member).capitalize()}, I'm changing the squelch to {self.squelch}")
# If the SDR is started, restart it with the updates
if self.sdr_started:
self.start_sdr()
# Hidden admin commands
@self.command(name='saveprofile', hidden=True)
async def _saveprofile(ctx, profile_name: str, member: discord.Member = None):
member = member or ctx.author.display_name
await self.save_radio_config(profile_name)
await ctx.send(f"Ok {str(member).capitalize()}, I saved the current settings as {profile_name}")
@self.command(name='loadprofile', hidden=True)
async def _loadprofile(ctx, profile_name: str, member: discord.Member = None):
member = member or ctx.author.display_name
config_loaded = await self.load_radio_config(profile_name)
if config_loaded:
await ctx.send(f"Ok {str(member).capitalize()}, I loaded the settings saved as {profile_name}")
else:
await ctx.send(f"{str(member).capitalize()}, there is no profile with the name '{profile_name}'")
@self.command(name='reload', hidden=True)
async def _reload(ctx, module: str, member: discord.Member = None):
"""Reloads a module."""
member = member or ctx.author.display_name
if self.reload_modules(module):
await ctx.send(f"Ok {str(member).capitalize()}, I reloaded {module}")
else:
await ctx.send(f"{str(member).capitalize()}, something went wrong. Please check the console")
@self.command(name='startsdr', hidden=True)
async def _startsdr(ctx, member: discord.Member = None):
member = member or ctx.author.display_name
self.start_sdr()
@self.command(name='stopsdr', hidden=True)
async def _stopsdr(ctx, member: discord.Member = None):
member = member or ctx.author.display_name
self.stop_sdr()
def load_opus(self):
# Check the system type and load the correct library
if self.system_os_type == 'Linux':
discord.opus.load_opus('./opus/libopus.so')
elif self.system_os_type == 'Windows':
discord.opus.load_opus('./opus/libopus.dll')
# Check to make sure the selected device is still available and has not changed it's index
def check_device(self):
for device, index in self.Devices_List:
if int(index) == self.DEVICE_ID and str(device) == self.DEVICE_NAME:
return True
for device, index in self.Devices_List:
if str(device) == self.DEVICE_NAME:
self.DEVICE_ID = int(index)
return True
else:
return False
# Search the ./modules folder for any modules to load
def check_for_modules(self):
# A valid module must be built as a 'cog', refer to the docs for more information
for folder_name in os.listdir("modules"):
if str(folder_name)[0] == '.':
continue
elif os.path.exists(os.path.join("modules", folder_name, "cog.py")):
print(f"Loaded extension: {folder_name}")
self.load_extension(f"modules.{folder_name}.cog")
# Reload a selected module for changes
def reload_modules(self, module):
try:
self.unload_extension(f"modules.{module}.cog")
print(f"Unloaded {module}")
self.load_extension(f"modules.{module}.cog")
print(f"Loaded {module}")
return True
except Exception as e:
print(e)
return False
# Check and store the OS type of the system for later use
def check_os_type(self):
if os.name == 'nt':
self.system_os_type = 'Windows'
else:
self.system_os_type = 'Linux'
# Check to see if there is only one frequency
def start_sdr(self):
if type(self.freq) == str:
# Single freq sent
# Stop the SDR if it is running
self.stop_sdr()
# Start the radio
print(f"Starting freq: {self.freq}")
# Set the settings in GQRX
self.GQRXHandler.set_all_settings(str(self.mode), str(self.squelch), str(self.freq))
# Set the started variable for later checks
self.sdr_started = True
# Check to see if the SDR is running
def stop_sdr(self):
if self.sdr_started:
# Wait for the running processes to close
# Need a way to 'close' GQRX
self.sdr_started = False
# Set the activity of the bot
async def set_activity(self, connected=True):
if connected:
if self.profile_name is None:
await self.change_presence(activity=discord.Activity(type=discord.ActivityType.listening,
name=f"{self.freq[:-1]}"
f" {str(self.mode).upper()}"),
status=discord.Status.online)
elif type(self.profile_name) == str:
await self.change_presence(activity=discord.Activity(type=discord.ActivityType.listening,
name=f"{str(self.profile_name).upper()}"),
status=discord.Status.online)
elif not connected:
await self.change_presence(activity=discord.Game(name=f"@ me"), status=discord.Status.idle)
# Save the current radio settings as a profile
async def save_radio_config(self, profile_name):
config = configparser.SafeConfigParser()
if os.path.exists('./profiles.ini'):
config.read('./profiles.ini')
if not config.has_section(str(profile_name)):
config.add_section(str(profile_name))
config[str(profile_name)]['Frequency'] = self.freq
config[str(profile_name)]['Mode'] = self.mode
config[str(profile_name)]['Squelch'] = str(self.squelch)
with open('./profiles.ini', 'w+') as config_file:
config.write(config_file)
if self.sdr_started:
self.start_sdr()
await self.set_activity()
# Load a saved profile into the current settings
async def load_radio_config(self, profile_name):
config = configparser.ConfigParser()
if os.path.exists('./profiles.ini'):
config.read('./profiles.ini')
if config.has_section(str(profile_name)):
self.profile_name = profile_name
self.freq = config[str(profile_name)]['Frequency']
self.mode = config[str(profile_name)]['Mode']
self.squelch = int(config[str(profile_name)]['Squelch'])
if self.sdr_started:
self.start_sdr()
await self.set_activity()
return True
else:
return False
else:
return False