From 107ab049ff83f9e95a0fbcbd5dd161da9795edec Mon Sep 17 00:00:00 2001 From: Logan Cusano Date: Sat, 24 May 2025 18:12:59 -0400 Subject: [PATCH] Implemented bot endpoint with DB requests and other tweaks --- app/internal/db_wrappers.py | 160 +++++++++++++++++++++++++++++++++++- app/internal/types.py | 78 ++++++++++++++++++ app/routers/bot.py | 84 +++++++++++++++++++ app/routers/nodes.py | 10 +-- app/routers/systems.py | 38 ++++----- app/server.py | 15 ++-- 6 files changed, 350 insertions(+), 35 deletions(-) create mode 100644 app/routers/bot.py diff --git a/app/internal/db_wrappers.py b/app/internal/db_wrappers.py index a773e5c..ac42658 100644 --- a/app/internal/db_wrappers.py +++ b/app/internal/db_wrappers.py @@ -4,13 +4,14 @@ from uuid import uuid4 from typing import Optional, List, Dict, Any from enum import Enum from internal.db_handler import MongoHandler -from internal.types import System +from internal.types import System, DiscordId # Init vars DB_NAME = os.getenv("DB_NAME", "default_db") MONGO_URL = os.getenv("MONGO_URL", "mongodb://10.10.202.4:27017/") SYSTEM_DB_COLLECTION_NAME = "radio_systems" +DISCORD_ID_DB_COLLECTION_NAME = "discord_bot_ids" # --- System class --- class SystemDbController(): @@ -189,3 +190,160 @@ class SystemDbController(): except Exception as e: print(f"Delete failed: {e}") return None + + +# --- DiscordIdDbController class --- +class DiscordIdDbController(): + def __init__(self): + # Init the handler for Discord IDs + self.db_h = MongoHandler(DB_NAME, DISCORD_ID_DB_COLLECTION_NAME, MONGO_URL) + + async def create_discord_id(self, discord_id_data: Dict[str, Any]) -> Optional[DiscordId]: + """ + Creates a new Discord ID entry in the database. + + Args: + discord_id_data: A dictionary containing the data for the new Discord ID. + + Returns: + The created DiscordId object if successful, None otherwise. + """ + print("\n--- Creating a Discord ID document ---") + try: + if not discord_id_data.get("_id"): + discord_id_data['_id'] = str(uuid4()) # Ensure _id is a string + + inserted_id = None + async with self.db_h as db: + insert_result = await self.db_h.insert_one(discord_id_data) + inserted_id = insert_result.inserted_id + + if inserted_id: + print(f"Discord ID insert successful with ID: {inserted_id}") + query = {"_id": inserted_id} + inserted_doc = None + async with self.db_h as db: + inserted_doc = await db.find_one(query) + + if inserted_doc: + return DiscordId.from_dict(inserted_doc) + else: + print("Discord ID insert acknowledged but no ID returned.") + return None + + except Exception as e: + print(f"Discord ID create failed: {e}") + return None + + async def find_discord_id(self, query: Dict[str, Any], active_only: bool = False) -> Optional[DiscordId]: + """ + Finds a single Discord ID entry in the database. + + Args: + query: A dictionary representing the query criteria. + active_only: If True, only returns active Discord IDs. + + Returns: + A DiscordId object if found, None otherwise. + """ + print("\n--- Finding one Discord ID document ---") + try: + if active_only: + query["active"] = True + + found_doc = None + async with self.db_h as db: + found_doc = await db.find_one(query) + + if found_doc: + print("Found Discord ID document (raw dict):", found_doc) + return DiscordId.from_dict(found_doc) + else: + print("Discord ID document not found.") + return None + except Exception as e: + print(f"Discord ID find failed: {e}") + return None + + async def find_discord_ids(self, query: Dict[str, Any] = {}, guild_id: Optional[str] = None, active_only: bool = False) -> Optional[List[DiscordId]]: + """ + Finds one or more Discord ID entries in the database. + + Args: + query: A dictionary representing the query criteria. + guild_id: Optional. If provided, filters Discord IDs that belong to this guild. + active_only: If True, only returns active Discord IDs. + + Returns: + A list of DiscordId object(s) if found, None otherwise. + """ + print("\n--- Finding multiple Discord ID documents ---") + try: + # Add active filter if requested + if active_only: + query["active"] = True + + # Add guild_id filter if provided + if guild_id: + query["guild_ids"] = {"$in": [guild_id]} + + found_docs = None + async with self.db_h as db: + found_docs = await db.find(query) + + if found_docs: + print(f"Found {len(found_docs)} Discord ID documents (raw dicts).") + converted_discord_ids = [] + for doc in found_docs: + converted_discord_ids.append(DiscordId.from_dict(doc)) + + return converted_discord_ids if len(converted_discord_ids) > 0 else None + else: + print("Discord ID documents not found.") + return None + except Exception as e: + print(f"Discord ID find failed: {e}") + return None + + async def update_discord_id(self, query: Dict[str, Any], update_data: Dict[str, Any]) -> Optional[int]: + """ + Updates a single Discord ID entry in the database. + + Args: + query: A dictionary representing the query criteria to find the document. + update_data: A dictionary representing the update operations (e.g., using $set). + + Returns: + The number of modified documents if successful, None otherwise. + """ + print("\n--- Updating a Discord ID document ---") + try: + update_result = None + async with self.db_h as db: + update_result = await db.update_one(query, update_data) + + print(f"Discord ID update result: Matched {update_result.matched_count}, Modified {update_result.modified_count}") + return update_result.modified_count + except Exception as e: + print(f"Discord ID update failed: {e}") + return None + + async def delete_discord_id(self, query: Dict[str, Any]) -> Optional[int]: + """ + Deletes a single Discord ID entry from the database. + Args: + query: A dictionary representing the query criteria to find the document to delete. + Returns: + The number of deleted documents if successful, None otherwise. + """ + print("\n--- Deleting a Discord ID document ---") + try: + delete_result = None + async with self.db_h as db: + delete_result = await self.db_h.delete_one(query) + + print(f"Discord ID delete result: Deleted count {delete_result.deleted_count}") + return delete_result.deleted_count + except Exception as e: + print(f"Discord ID delete failed: {e}") + return None \ No newline at end of file diff --git a/app/internal/types.py b/app/internal/types.py index bf7127d..e16e64c 100644 --- a/app/internal/types.py +++ b/app/internal/types.py @@ -1,11 +1,24 @@ from typing import Optional, List, Dict, Any from enum import Enum +class ActiveClient: + """ + The active client model in memory for quicker access + """ + websocket = None + active_token: str = None + class DemodTypes(str, Enum): P25 = "P25" DMR = "DMR" ANALOG = "NBFM" + +class NodeCommands(str, Enum): + JOIN = "join_server" + LEAVE = "leave_server" + + class TalkgroupTag: """Represents a talkgroup tag.""" def __init__(self, talkgroup: str, tagDec: int): @@ -16,6 +29,71 @@ class TalkgroupTag: def to_dict(self) -> Dict[str, Any]: return {"talkgroup": self.talkgroup, "tagDec": self.tagDec} + +class DiscordId: + """ + A data model for a Discord ID entry. + """ + def __init__(self, + _id: str, + discord_id: str, + name: str, + token: str, + active: bool, + guild_ids: List[str]): + """ + Initializes a DiscordId object. + + Args: + _id: A unique identifier for the entry (e.g., MongoDB ObjectId string). + discord_id: The Discord user ID. + name: The name associated with the Discord ID. + token: The authentication token. + active: Boolean indicating if the ID is active. + guild_ids: A list of guild IDs the Discord user is part of. + """ + self._id: str = str(_id) + self.discord_id: str = discord_id + self.name: str = name + self.token: str = token + self.active: bool = active + self.guild_ids: List[str] = guild_ids + + def __repr__(self) -> str: + """ + Provides a developer-friendly string representation of the object. + """ + return (f"DiscordId(_id='{self._id}', discord_id='{self.discord_id}', name='{self.name}', " + f"token='{self.token}', active={self.active}, guild_ids={self.guild_ids})") + + def to_dict(self) -> Dict[str, Any]: + """ + Converts the DiscordId object to a dictionary suitable for MongoDB. + """ + return { + "_id": self._id, + "discord_id": self.discord_id, + "name": self.name, + "token": self.token, + "active": self.active, + "guild_ids": self.guild_ids, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "DiscordId": + """ + Creates a DiscordId object from a dictionary (e.g., from MongoDB). + """ + return cls( + _id=data.get("_id"), + discord_id=data.get("discord_id", ""), + name=data.get("name", ""), + token=data.get("token", ""), + active=data.get("active", False), # Default to False if not present + guild_ids=data.get("guild_ids", []), # Default to empty list if not present + ) + + class System: """ A basic data model for a channel/system entry in a radio system. diff --git a/app/routers/bot.py b/app/routers/bot.py new file mode 100644 index 0000000..8e5d78a --- /dev/null +++ b/app/routers/bot.py @@ -0,0 +1,84 @@ +import json +from quart import Blueprint, jsonify, request, abort, current_app +from werkzeug.exceptions import HTTPException +from internal.types import ActiveClient +from internal.db_wrappers import DiscordIdDbController + +bot_bp = Blueprint('bot', __name__) + + +@bot_bp.route("/", methods=['GET']) +async def get_online_bots_route(): + """API endpoint to list bots (by name) currently online.""" + return jsonify(list(current_app.active_clients.keys())) + + +@bot_bp.route('/request_token', methods=['POST']) +async def request_token_route(): + """ + API endpoint to request a token for a client. + Expects 'client_id' in the JSON request body. + """ + try: + request_data = await request.get_json() + if not request_data or 'client_id' not in request_data: + abort(400, "Missing 'client_id' in request body.") + + client_id = request_data['client_id'] + print(f"Request received for client_id: {client_id}") + + # get the available IDs + active_d_ids = await current_app.d_id_db_h.find_discord_ids(active_only=True) + + # init available IDs list + avail_ids = [] + # Init the selected ID + selected_id = None + + # Check which IDs are currently in use by other bots + for d_id in active_d_ids: + if not find_token_in_active_clients(d_id.token): + avail_ids.append(d_id) + + if not avail_ids: + abort(404, "No available active Discord IDs found.") + + # --- Logic for selecting a preferred ID based on client_id (TODO) --- + + selected_id = avail_ids[0] + + # --- End of logic for selecting a preferred ID --- + + return jsonify(selected_id.to_dict()) + + except Exception as e: + print(f"Error in request_token_route: {e}") + abort(500, f"An internal error occurred: {e}") + + +def find_token_in_active_clients(target_token: str) -> bool: + """ + Checks if a target_token exists in the active_token of any ActiveClient object in a list. + + Args: + clients: A list of ActiveClient objects. + target_token: The token string to search for. + + Returns: + True if the token is found in any ActiveClient, False otherwise. + """ + for client in current_app.active_clients: + if client.active_token == target_token: + return True + return False + + + + + + + + + # query_params = dict(request.args) + + # systems = await current_app.sys_db_h.find_systems(query_params) \ No newline at end of file diff --git a/app/routers/nodes.py b/app/routers/nodes.py index 23b8239..bb85164 100644 --- a/app/routers/nodes.py +++ b/app/routers/nodes.py @@ -2,17 +2,14 @@ import json from quart import Blueprint, jsonify, request, abort, current_app from werkzeug.exceptions import HTTPException from enum import Enum +from internal.types import ActiveClient, NodeCommands nodes_bp = Blueprint('nodes', __name__) -class NodeCommands(str, Enum): - JOIN = "join_server" - LEAVE = "leave_server" - async def register_client(websocket, client_id): """Registers a new client connection.""" - current_app.active_clients[client_id] = websocket + current_app.active_clients[client_id] = ActiveClient(websocket) print(f"Client {client_id} connected.") @@ -26,7 +23,7 @@ async def unregister_client(client_id): async def send_command_to_client(client_id, command_name, *args): """Sends a command to a specific client.""" if client_id in current_app.active_clients: - websocket = current_app.active_clients[client_id] + websocket = current_app.active_clients[client_id].websocket message = json.dumps({"type": "command", "name": command_name, "args": args}) try: await websocket.send(message) @@ -52,7 +49,6 @@ async def send_command_to_all_clients(command_name, *args): await unregister_client(client_id) - @nodes_bp.route("/", methods=['GET']) async def get_nodes(): """API endpoint to list currently connected client IDs.""" diff --git a/app/routers/systems.py b/app/routers/systems.py index 1c7721a..9a3c51b 100644 --- a/app/routers/systems.py +++ b/app/routers/systems.py @@ -1,10 +1,8 @@ -from quart import Blueprint, jsonify, request, abort -from internal.db_wrappers import SystemDbController +from quart import Blueprint, jsonify, request, abort, current_app from werkzeug.exceptions import HTTPException from internal.types import System systems_bp = Blueprint('systems', __name__) -db_h = SystemDbController() @systems_bp.route("/", methods=['POST']) @@ -19,24 +17,24 @@ async def create_system_route(): abort(400, "Request body must be JSON") # Bad Request if '_id' in request_data: - id_search_result = await db_h.find_system({"_id": request_data["_id"]}) + id_search_result = await current_app.sys_db_h.find_system({"_id": request_data["_id"]}) if id_search_result: # If _id is provided and exists, return conflict abort(409, f"System with ID '{request_data['_id']}' already exists") # Check if name exists (optional, depending on requirements) if 'name' in request_data: - name_search_result = await db_h.find_system({"name": request_data["name"]}) + name_search_result = await current_app.sys_db_h.find_system({"name": request_data["name"]}) if name_search_result: abort(409, f"System with name '{request_data['name']}' already exists") # Check if frequency_khz exists (optional, depending on requirements) if 'frequency_khz' in request_data: - freq_search_result = await db_h.find_system({"frequency_khz": request_data["frequency_khz"]}) + freq_search_result = await current_app.sys_db_h.find_system({"frequency_khz": request_data["frequency_khz"]}) if freq_search_result: abort(409, f"System with frequency '{request_data['frequency_khz']}' already exists") - created_system = await db_h.create_system(request_data) + created_system = await current_app.sys_db_h.create_system(request_data) if created_system: print("Created new system:", created_system) @@ -56,7 +54,7 @@ async def list_systems_route(): """API endpoint to get a list of all systems.""" print("\n--- Handling GET /systems ---") try: - all_systems = await db_h.find_all_systems() + all_systems = await current_app.sys_db_h.find_all_systems() return jsonify([system.to_dict() for system in all_systems]), 200 # 200 OK status code except HTTPException: @@ -72,7 +70,7 @@ async def get_system_route(system_id: str): print(f"\n--- Handling GET /systems/{system_id} ---") try: # Fix the query dictionary syntax - system = await db_h.find_system({'_id': system_id}) + system = await current_app.sys_db_h.find_system({'_id': system_id}) if system: # system is a System object, jsonify will convert it @@ -93,7 +91,7 @@ async def get_system_by_client_route(client_id: str): print(f"\n--- Handling GET /systems/client/{client_id} ---") try: # Fix the query dictionary syntax - systems = await db_h.find_systems({'avail_on_nodes': client_id}) + systems = await current_app.sys_db_h.find_systems({'avail_on_nodes': client_id}) if systems: # system is a System object, jsonify will convert it @@ -111,7 +109,7 @@ async def get_system_by_client_route(client_id: str): @systems_bp.route('/', methods=['PUT']) async def update_system_route(system_id: str, updated_system_data): try: - update_system = await db_h.update_system({"_id", system_id}, updated_system_data) + update_system = await current_app.sys_db_h.update_system({"_id", system_id}, updated_system_data) if update_system: print("Updated system:", update_system) @@ -130,7 +128,7 @@ async def update_system_route(system_id: str, updated_system_data): @systems_bp.route('/', methods=['DELETE']) async def delete_system_route(system_id: str): try: - deleted_system = await db_h.delete_system({"_id", system_id}) + deleted_system = await current_app.sys_db_h.delete_system({"_id", system_id}) if deleted_system: print("Deleted system:", deleted_system) @@ -166,7 +164,7 @@ async def assign_client_to_system_route(system_id: str): abort(400, "'client_id' must be a non-empty string") # First, check if the system exists - existing_system = await db_h.find_system({"_id": system_id}) + existing_system = await current_app.sys_db_h.find_system({"_id": system_id}) if existing_system is None: abort(404, f"System with ID '{system_id}' not found") @@ -175,7 +173,7 @@ async def assign_client_to_system_route(system_id: str): update_query = {"_id": system_id} update_data = {"$addToSet": {"avail_on_nodes": client_id}} - update_result = await db_h.update_system(update_query, update_data) + update_result = await current_app.sys_db_h.update_system(update_query, update_data) if update_result > 0: print(f"Client '{client_id}' assigned to system '{system_id}'.") @@ -184,7 +182,7 @@ async def assign_client_to_system_route(system_id: str): print(f"Client '{client_id}' was already assigned to system '{system_id}'.") status = "already_assigned" - updated_system = await db_h.find_system({"_id": system_id}) + updated_system = await current_app.sys_db_h.find_system({"_id": system_id}) if updated_system: return jsonify({ "status": status, @@ -222,7 +220,7 @@ async def dismiss_client_from_system_route(system_id: str): abort(400, "'client_id' must be a non-empty string") # First, check if the system exists - existing_system = await db_h.find_system({"_id": system_id}) + existing_system = await current_app.sys_db_h.find_system({"_id": system_id}) if existing_system is None: abort(404, f"System with ID '{system_id}' not found") @@ -231,7 +229,7 @@ async def dismiss_client_from_system_route(system_id: str): update_query = {"_id": system_id} update_data = {"$pull": {"avail_on_nodes": client_id}} - update_result = await db_h.update_system(update_query, update_data) + update_result = await current_app.sys_db_h.update_system(update_query, update_data) if update_result > 0: print(f"Client '{client_id}' dismissed from system '{system_id}'.") @@ -242,7 +240,7 @@ async def dismiss_client_from_system_route(system_id: str): # Note: update_result.matched_count will be 1 even if modified_count is 0 # Optionally fetch the updated document to return its current state - updated_system = await db_h.find_system({"_id": system_id}) + updated_system = await current_app.sys_db_h.find_system({"_id": system_id}) if updated_system: return jsonify({ "status": status, @@ -271,7 +269,9 @@ async def search_systems_route(): try: query_params = dict(request.args) - systems = await db_h.find_systems(query_params) + systems = await current_app.sys_db_h.find_systems(query_params) + + print("Found systems", systems) if systems: # If systems are found, return them as a list of dictionaries diff --git a/app/server.py b/app/server.py index 02ace77..beea133 100644 --- a/app/server.py +++ b/app/server.py @@ -5,6 +5,8 @@ import uuid from quart import Quart, jsonify, request from routers.systems import systems_bp from routers.nodes import nodes_bp, register_client, unregister_client +from routers.bot import bot_bp +from internal.db_wrappers import SystemDbController, DiscordIdDbController # --- WebSocket Server Components --- # Dictionary to store active clients: {client_id: websocket} @@ -52,6 +54,10 @@ websocket_server_instance = None # Make active_clients accessible via the app instance. app.active_clients = active_clients +# Create and attach the DB wrappers +app.sys_db_h = SystemDbController() +app.d_id_db_h = DiscordIdDbController() + @app.before_serving async def startup_websocket_server(): """Starts the WebSocket server when the Quart app starts.""" @@ -79,19 +85,12 @@ async def shutdown_websocket_server(): app.register_blueprint(systems_bp, url_prefix="/systems") app.register_blueprint(nodes_bp, url_prefix="/nodes") +app.register_blueprint(bot_bp, url_prefix="/bots") @app.route('/') async def index(): return "Welcome to the Radio App Server API!" -@app.route('/request_token', methods=['POST']) -async def request_token(): - """API endpoint to list currently connected client IDs.""" - # TODO - Add DB logic - return jsonify({ - "token": "MTE5NjAwNTM2ODYzNjExMjk3Nw.GuCMXg.24iNNofNNumq46FIj68zMe9RmQgugAgfrvelEA" - }) - # --- Main Execution --- if __name__ == "__main__": # Quart's app.run() will start the asyncio event loop and manage it.