This commit is contained in:
Logan Cusano
2025-05-24 02:09:35 -04:00
commit f34241acac
7 changed files with 743 additions and 0 deletions

0
.gitignore vendored Normal file
View File

19
Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
# Use an official Python runtime as a parent image
FROM python:3.13-slim
# Set the working directory in the container
WORKDIR /app
# Move the requirements file
COPY ./requirements.txt .
# Install any needed packages specified in requirements.txt
# Make sure you have a requirements.txt file in the same directory as your Dockerfile
RUN pip install --no-cache-dir -r requirements.txt
# Copy the current directory contents into the container at /usr/src/app
COPY ./app/ .
# Run bot.py when the container launches
# Use the exec form to avoid issues with signal handling
CMD ["python", "bot.py"]

32
Makefile Normal file
View File

@@ -0,0 +1,32 @@
# Define the name for your Docker image
IMAGE_NAME = drb_server_discord_bot
# Default target when you just run 'make'
all: build
# Target to build the Docker image
build:
docker build -t $(IMAGE_NAME) .
# Target to run the Docker container
# Requires the DISCORD_BOT_TOKEN environment variable to be set
run: build
docker run -it --rm --name $(IMAGE_NAME)_container --network=host -e DISCORD_BOT_TOKEN="${DISCORD_BOT_TOKEN}" $(IMAGE_NAME)
# Target to stop the running container
stop:
docker stop $(IMAGE_NAME)_container || true
# Target to remove the container
remove:
docker rm $(IMAGE_NAME)_container || true
# Target to clean up the built image
clean:
docker rmi $(IMAGE_NAME) || true
# Target to stop, remove, and clean everything
clean_all: stop remove clean
# Phony targets to avoid conflicts with files of the same name
.PHONY: all build run stop remove clean clean_all

168
app/bot.py Normal file
View File

@@ -0,0 +1,168 @@
import discord
from discord.ext import commands
import os
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Define intents
# You might need to adjust these based on the features your bot uses
intents = discord.Intents.default()
intents.message_content = True # Required to read message content in most cases
intents.members = True # Required for member-related events and fetching members
intents.presences = True
# Initialize the bot
# command_prefix is the character(s) that trigger bot commands (e.g., !command)
bot = commands.Bot(command_prefix='!', intents=intents)
# --- Event Handlers ---
@bot.event
async def on_ready():
"""Logs when the bot is ready and connected to Discord."""
logging.info(f'Logged in as {bot.user.name} ({bot.user.id})')
logging.info('------')
logging.info('Loading Modules')
logging.info('------')
await load_extensions() # Load command modules when the bot is ready
logging.info('Registering slash commands')
logging.info('------')
for server in bot.guilds:
num_synced = await bot.tree.sync(guild=discord.Object(id=server.id))
logging.info(f"Registered {num_synced} commands for server ID: '{server.id}'")
@bot.event
async def on_command_error(ctx, error):
"""Handles errors that occur when running commands."""
if isinstance(error, commands.CommandNotFound):
# Ignore CommandNotFound errors to avoid spamming the console/chat
return
elif isinstance(error, commands.MissingRequiredArgument):
await ctx.send(f'Error: Missing required argument(s). Usage: `{ctx.command.usage or "No usage info provided."}`')
elif isinstance(error, commands.BadArgument):
await ctx.send(f'Error: Invalid argument(s) provided. Usage: `{ctx.command.usage or "No usage info provided."}`')
elif isinstance(error, commands.MissingPermissions):
await ctx.send("Error: You don't have the necessary permissions to run this command.")
elif isinstance(error, commands.BotMissingPermissions):
await ctx.send("Error: I don't have the necessary permissions to run this command.")
else:
# Log other errors for debugging
logging.error(f'Ignoring exception in command {ctx.command}:', exc_info=error)
await ctx.send(f'An unexpected error occurred: {error}')
# --- Extension (Command Module) Loading ---
async def load_extensions():
"""Loads all command modules from the 'commands' directory."""
logging.info("Attempting to load extensions...")
commands_dir = 'commands'
if not os.path.exists(commands_dir):
logging.warning(f"'{commands_dir}' directory not found. No commands will be loaded.")
return
for filename in os.listdir(commands_dir):
# Check if the file is a Python file and not a hidden file
if filename.endswith('.py') and not filename.startswith('_'):
# Construct the module path (e.g., commands.example)
module_name = f'{commands_dir}.{filename[:-3]}'
try:
await bot.load_extension(module_name)
logging.info(f'Successfully loaded extension: {module_name}')
except Exception as e:
logging.error(f'Failed to load extension {module_name}: {e}', exc_info=True)
async def unload_extensions():
"""Unloads all loaded command modules."""
logging.info("Attempting to unload extensions...")
for extension in list(bot.extensions): # Iterate over a copy as unloading modifies the list
try:
await bot.unload_extension(extension)
logging.info(f'Successfully unloaded extension: {extension}')
except Exception as e:
logging.error(f'Failed to unload extension {extension}: {e}', exc_info=True)
async def reload_extensions():
"""Reloads all loaded command modules."""
logging.info("Attempting to reload extensions...")
await unload_extensions()
await load_extensions()
logging.info("Extensions reloaded.")
# --- Admin Commands (Optional, for managing extensions) ---
# You might want to restrict these commands to specific users or roles
@bot.command(name='load', hidden=True)
@commands.is_owner() # Requires the user to be the bot owner (set via application settings)
async def load_command(ctx, extension_name: str):
"""Loads a specific command extension."""
module_name = f'commands.{extension_name}'
try:
await bot.load_extension(module_name)
await ctx.send(f'Successfully loaded extension: `{module_name}`')
logging.info(f'Manual load: Successfully loaded extension: {module_name}')
except commands.ExtensionAlreadyLoaded:
await ctx.send(f'Extension `{module_name}` is already loaded.')
except commands.ExtensionNotFound:
await ctx.send(f'Extension `{module_name}` not found.')
except Exception as e:
await ctx.send(f'Failed to load extension `{module_name}`: {e}')
logging.error(f'Manual load: Failed to load extension {module_name}: {e}', exc_info=True)
@bot.command(name='unload', hidden=True)
@commands.is_owner()
async def unload_command(ctx, extension_name: str):
"""Unloads a specific command extension."""
module_name = f'commands.{extension_name}'
try:
await bot.unload_extension(module_name)
await ctx.send(f'Successfully unloaded extension: `{module_name}`')
logging.info(f'Manual unload: Successfully unloaded extension: {module_name}')
except commands.ExtensionNotFound:
await ctx.send(f'Extension `{module_name}` not found or not loaded.')
except Exception as e:
await ctx.send(f'Failed to unload extension `{module_name}`: {e}')
logging.error(f'Manual unload: Failed to unload extension {module_name}: {e}', exc_info=True)
@bot.command(name='reload', hidden=True)
@commands.is_owner()
async def reload_command(ctx, extension_name: str = None):
"""Reloads a specific command extension or all extensions."""
if extension_name:
module_name = f'commands.{extension_name}'
try:
await bot.reload_extension(module_name)
await ctx.send(f'Successfully reloaded extension: `{module_name}`')
logging.info(f'Manual reload: Successfully reloaded extension: {module_name}')
except commands.ExtensionNotFound:
await ctx.send(f'Extension `{module_name}` not found or not loaded.')
except Exception as e:
await ctx.send(f'Failed to reload extension `{module_name}`: {e}')
logging.error(f'Manual reload: Failed to reload extension {module_name}: {e}', exc_info=True)
else:
# Reload all extensions if no specific name is provided
await reload_extensions()
await ctx.send('All extensions reloaded.')
# --- Running the Bot ---
if __name__ == "__main__":
# Get the bot token from environment variables
# It's recommended to use environment variables for sensitive information
DISCORD_BOT_TOKEN = os.getenv('DISCORD_BOT_TOKEN')
if DISCORD_BOT_TOKEN is None:
logging.error("DISCORD_BOT_TOKEN environment variable not set.")
logging.error("Please set the DISCORD_BOT_TOKEN environment variable with your bot's token.")
else:
try:
# Run the bot
bot.run(DISCORD_BOT_TOKEN)
except discord.errors.LoginFailure:
logging.error("Invalid Discord bot token provided. Please check your token.")
except Exception as e:
logging.error(f"An error occurred while running the bot: {e}", exc_info=True)

97
app/commands/drb.py Normal file
View File

@@ -0,0 +1,97 @@
import discord
from discord.ext import commands
from discord import app_commands # Import the app_commands module
from internal.drb_srv_api import SystemAPIWrapper, NodeAPIWrapper
# This is a standard setup for a command module (often called a "Cog")
# It inherits from commands.Cog
class DrbCommands(commands.Cog):
"""A collection of example slash commands."""
# The __init__ method takes the bot instance as an argument
def __init__(self, bot):
self.bot = bot
self.drb_sys_api = SystemAPIWrapper()
self.drb_node_api = NodeAPIWrapper()
# Connect a bot with the selected system to the channel the requester is in / move the bot to the channel the requester is in
@commands.hybrid_command(name="join", description="Request a bot to join your channel listening to a system.")
@app_commands.describe(system_name="The name of the system to join.")
async def join(self, ctx: commands.Context, system_name:str):
# Get the system details
sys_search_results = await self.drb_sys_api.search_systems(name=system_name)
selected_system = None
# Make sure there is a system found
if len(sys_search_results) == 1:
selected_system = sys_search_results[0]
else:
pass # Replace with code to ask the user which they would like
if not selected_system: return
# Get all the nodes that have this system
avail_on_nodes = selected_system['avail_on_nodes']
selected_node_id = None
if len(avail_on_nodes) == 0:
return
if len(avail_on_nodes) == 1:
selected_node_id = avail_on_nodes[0]
else:
return # TODO - Implement this
# Check to see if there is a preferred node for the system, if not select one
# check to make sure it's available, if not loop back and select the next one
if not selected_node_id: return
# Get the channel the user is currently in
channel_id = ctx.author.voice.channel.id
# Get the guild the user messaged from
guild_id = ctx.guild.id
print(selected_node_id, selected_system['_id'], guild_id, channel_id)
api_response = await self.drb_node_api.join(selected_node_id, selected_system['_id'], guild_id, channel_id)
if isinstance(ctx, commands.Context):
# This was invoked as a prefix command (e.g., !hello)
await ctx.send(f"{api_response}!")
elif isinstance(ctx, discord.Interaction):
# This was invoked as a slash command (/hello)
await ctx.response.send_message(f"{api_response}!")
@join.autocomplete("system_name")
async def system_name_autocomplete(self,
interaction: discord.Interaction,
current: str,
) -> list[app_commands.Choice[str]]:
"""
Autocomplete for system_name, fetches systems.
"""
# Fetch systems from your database (async call)
systems = await self.drb_sys_api.search_systems(name=current)
# Create app_commands.Choice objects
return [
app_commands.Choice(name=system, value=system)
for system in systems[:25] # Discord has a limit of 25 choices
]
@commands.hybrid_command(name="leave", description="Request that a bot leave your server.")
async def leave(self, ctx: commands.Context):
if isinstance(ctx, commands.Context):
# This was invoked as a prefix command (e.g., !hello)
await ctx.send(f"Hello, {ctx.author.display_name}!")
elif isinstance(ctx, discord.Interaction):
# This was invoked as a slash command (/hello)
await ctx.response.send_message(f"Hello, {ctx.user.display_name}!")
# Disconnect the selected bot from the server the requster is in
# --- Setup Function ---
async def setup(bot):
"""Adds the SlashExampleCommands cog to the bot and syncs slash commands."""
await bot.add_cog(DrbCommands(bot))
print("Hybrid commands cog loaded. Remember to sync commands!")

425
app/internal/drb_srv_api.py Normal file
View File

@@ -0,0 +1,425 @@
import requests
import json
import os
class CoreAPI:
def _handle_response(self, response):
"""
Handles the API response, checking for successful status codes and
raising exceptions for errors.
Args:
response (requests.Response): The response object from a requests call.
Returns:
dict or list: The JSON response body if the request was successful.
Raises:
requests.exceptions.HTTPError: If the response status code indicates an error.
requests.exceptions.RequestException: For other request-related errors.
json.JSONDecodeError: If the response body is not valid JSON.
"""
try:
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
if response.content:
return response.json()
return None # Handle cases with no response body (e.g., 204 No Content)
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
raise
except json.JSONDecodeError:
print(f"Failed to decode JSON from response: {response.text}")
raise
class NodeAPIWrapper(CoreAPI):
"""
A wrapper class for interacting with the Node API endpoints.
"""
def __init__(self):
"""
Initializes the API wrapper with the base URL of the Quart application.
Args:
base_url (str): The base URL of your Quart application (e.g., "http://localhost:5000").
"""
self.base_url = f"{os.getenv('API_BASE_URL', 'http://localhost:5000')}/nodes"
async def get_nodes(self) -> requests.Response:
"""
API endpoint to list currently connected client IDs.
Returns:
requests.Response: The response object from the API.
On success (200), response.json() will be a list of client IDs.
"""
url = self.base_url
try:
response = requests.get(url)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error creating system: {e}")
raise
async def join(self, client_id: str, system_id: str, guild_id: str, channel_id: str) -> requests.Response:
"""
Send a join command to the specific system specified.
Args:
client_id (str): The ID of the client to send the command to.
system_id (str): The system ID for the join command.
guild_id (str): The guild ID for the join command.
channel_id (str): The channel ID for the join command.
Returns:
requests.Response: The response object from the API.
On success (200), response.json() will contain status information.
"""
url = f"{self.base_url}/join"
payload = {
"client_id": client_id,
"system_id": system_id,
"guild_id": guild_id,
"channel_id": channel_id
}
try:
response = requests.post(url, json=payload)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error creating system: {e}")
raise
async def leave(self, client_id: str, guild_id: str) -> requests.Response:
"""
Send a leave command to the specific client.
Args:
client_id (str): The ID of the client to send the command to.
guild_id (str): The guild ID for the leave command.
Returns:
requests.Response: The response object from the API.
On success (200), response.json() will contain status information.
"""
url = f"{self.base_url}/leave"
payload = {
"client_id": client_id,
"guild_id": guild_id
}
try:
response = requests.post(url, json=payload)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error creating system: {e}")
raise
class SystemAPIWrapper(CoreAPI):
"""
A Python wrapper class for interacting with the System API endpoints
using the requests library.
"""
def __init__(self):
"""
Initializes the SystemAPIWrapper with the base URL of the API.
Args:
base_url (str): The base URL of the System API (e.g., "http://localhost:5000/systems").
"""
self.base_url = f"{os.getenv('API_BASE_URL', 'http://localhost:5000')}/systems"
async def create_system(self, system_data):
"""
Creates a new system.
Args:
system_data (dict): A dictionary containing the system data.
Returns:
dict: The created system object.
Raises:
requests.exceptions.RequestException: If the request fails.
json.JSONDecodeError: If the response is not valid JSON.
"""
url = f"{self.base_url}/"
try:
response = requests.post(url, json=system_data)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error creating system: {e}")
raise
async def list_systems(self):
"""
Retrieves a list of all systems.
Returns:
list: A list of system objects.
Raises:
requests.exceptions.RequestException: If the request fails.
json.JSONDecodeError: If the response is not valid JSON.
"""
url = f"{self.base_url}/"
try:
response = requests.get(url)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error listing systems: {e}")
raise
async def get_system_by_id(self, system_id):
"""
Retrieves details for a specific system by ID.
Args:
system_id (str): The ID of the system.
Returns:
dict: The system object.
Raises:
requests.exceptions.RequestException: If the request fails (e.g., 404 if not found).
json.JSONDecodeError: If the response is not valid JSON.
"""
url = f"{self.base_url}/{system_id}"
try:
response = requests.get(url)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error getting system by ID {system_id}: {e}")
raise
async def get_systems_by_client_id(self, client_id):
"""
Retrieves a list of systems available on a specific client ID.
Args:
client_id (str): The ID of the client.
Returns:
list: A list of system objects.
Raises:
requests.exceptions.RequestException: If the request fails (e.g., 404 if client not found or no systems).
json.JSONDecodeError: If the response is not valid JSON.
"""
url = f"{self.base_url}/client/{client_id}"
try:
response = requests.get(url)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error getting systems by client ID {client_id}: {e}")
raise
async def update_system(self, system_id, updated_system_data):
"""
Updates a specific system by ID.
Args:
system_id (str): The ID of the system to update.
updated_system_data (dict): A dictionary containing the updated system data.
Returns:
dict: The updated system object.
Raises:
requests.exceptions.RequestException: If the request fails.
json.JSONDecodeError: If the response is not valid JSON.
"""
url = f"{self.base_url}/{system_id}"
try:
response = requests.put(url, json=updated_system_data)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error updating system {system_id}: {e}")
raise
async def delete_system(self, system_id):
"""
Deletes a specific system by ID.
Args:
system_id (str): The ID of the system to delete.
Returns:
dict: The deleted system object.
Raises:
requests.exceptions.RequestException: If the request fails.
json.JSONDecodeError: If the response is not valid JSON.
"""
url = f"{self.base_url}/{system_id}"
try:
response = requests.delete(url)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error deleting system {system_id}: {e}")
raise
async def search_systems(self, **kwargs):
"""
Searches for systems based on provided query parameters.
Args:
**kwargs: Keyword arguments representing the query parameters
(e.g., name="MySystem", frequency_khz=1000).
Returns:
list: A list of system objects matching the criteria.
Raises:
requests.exceptions.RequestException: If the request fails.
json.JSONDecodeError: If the response is not valid JSON.
"""
url = f"{self.base_url}/search"
try:
# requests.get automatically handles the params dictionary
response = requests.get(url, params=kwargs)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error searching systems with parameters {kwargs}: {e}")
raise
async def assign_client_to_system(self, system_id, client_id):
"""
Assigns a client ID to a system's available_on_nodes list.
Args:
system_id (str): The ID of the system.
client_id (str): The ID of the client to assign.
Returns:
dict: A dictionary containing the status and the updated system object.
Raises:
requests.exceptions.RequestException: If the request fails.
json.JSONDecodeError: If the response is not valid JSON.
"""
url = f"{self.base_url}/{system_id}/assign"
payload = {"client_id": client_id}
try:
response = requests.post(url, json=payload)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error assigning client {client_id} to system {system_id}: {e}")
raise
async def dismiss_client_from_system(self, system_id, client_id):
"""
Dismisses (removes) a client ID from a system's available_on_nodes list.
Args:
system_id (str): The ID of the system.
client_id (str): The ID of the client to dismiss.
Returns:
dict: A dictionary containing the status and the updated system object.
Raises:
requests.exceptions.RequestException: If the request fails.
json.JSONDecodeError: If the response is not valid JSON.
"""
url = f"{self.base_url}/{system_id}/dismiss"
payload = {"client_id": client_id}
try:
response = requests.post(url, json=payload)
return self._handle_response(response)
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
print(f"Error dismissing client {client_id} from system {system_id}: {e}")
raise
# Example Usage (assuming your Quart app is running on http://localhost:5000)
if __name__ == '__main__':
api = SystemAPIWrapper("http://localhost:5000/systems")
# Example: Create a new system
new_system_data = {
"name": "Test System",
"frequency_khz": 100,
"avail_on_nodes": []
}
try:
created_system = api.create_system(new_system_data)
print("Created System:", created_system)
except requests.exceptions.HTTPError as e:
print(f"Failed to create system: {e.response.status_code} - {e.response.text}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Example: List all systems
try:
all_systems = api.list_systems()
print("\nAll Systems:", all_systems)
except requests.exceptions.RequestException as e:
print(f"Failed to list systems: {e}")
# Example: Get a system by ID (replace with a known system ID)
system_id_to_get = "some_system_id" # Replace with an actual ID
try:
system = api.get_system_by_id(system_id_to_get)
print(f"\nSystem with ID {system_id_to_get}:", system)
except requests.exceptions.HTTPError as e:
print(f"Failed to get system by ID {system_id_to_get}: {e.response.status_code} - {e.response.text}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Example: Get systems by client ID (replace with a known client ID)
client_id_to_get = "some_client_id" # Replace with an actual ID
try:
client_systems = api.get_systems_by_client_id(client_id_to_get)
print(f"\nSystems for client ID {client_id_to_get}:", client_systems)
except requests.exceptions.HTTPError as e:
print(f"Failed to get systems by client ID {client_id_to_get}: {e.response.status_code} - {e.response.text}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Example: Update a system (replace with a known system ID)
system_id_to_update = "some_system_id" # Replace with an actual ID
updated_data = {"frequency_khz": 120}
try:
updated_system = api.update_system(system_id_to_update, updated_data)
print(f"\nUpdated system {system_id_to_update}:", updated_system)
except requests.exceptions.HTTPError as e:
print(f"Failed to update system {system_id_to_update}: {e.response.status_code} - {e.response.text}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Example: Assign a client to a system (replace with known IDs)
system_id_to_assign = "some_system_id" # Replace with an actual ID
client_id_to_assign = "new_client_1" # Replace with an actual ID
try:
assign_result = api.assign_client_to_system(system_id_to_assign, client_id_to_assign)
print(f"\nAssign client {client_id_to_assign} to system {system_id_to_assign} result:", assign_result)
except requests.exceptions.HTTPError as e:
print(f"Failed to assign client to system: {e.response.status_code} - {e.response.text}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Example: Dismiss a client from a system (replace with known IDs)
system_id_to_dismiss = "some_system_id" # Replace with an actual ID
client_id_to_dismiss = "new_client_1" # Replace with an actual ID
try:
dismiss_result = api.dismiss_client_from_system(system_id_to_dismiss, client_id_to_dismiss)
print(f"\nDismiss client {client_id_to_dismiss} from system {system_id_to_dismiss} result:", dismiss_result)
except requests.exceptions.HTTPError as e:
print(f"Failed to dismiss client from system: {e.response.status_code} - {e.response.text}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Example: Delete a system (replace with a known system ID)
system_id_to_delete = "some_system_id" # Replace with an actual ID
try:
deleted_system = api.delete_system(system_id_to_delete)
print(f"\nDeleted system {system_id_to_delete}:", deleted_system)
except requests.exceptions.HTTPError as e:
print(f"Failed to delete system {system_id_to_delete}: {e.response.status_code} - {e.response.text}")
except Exception as e:
print(f"An unexpected error occurred: {e}")

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
discord.py
requests