Implemented bot endpoint with DB requests and other tweaks

This commit is contained in:
Logan Cusano
2025-05-24 18:12:59 -04:00
parent 15b12ecd5f
commit 107ab049ff
6 changed files with 350 additions and 35 deletions

View File

@@ -4,13 +4,14 @@ from uuid import uuid4
from typing import Optional, List, Dict, Any
from enum import Enum
from internal.db_handler import MongoHandler
from internal.types import System
from internal.types import System, DiscordId
# Init vars
DB_NAME = os.getenv("DB_NAME", "default_db")
MONGO_URL = os.getenv("MONGO_URL", "mongodb://10.10.202.4:27017/")
SYSTEM_DB_COLLECTION_NAME = "radio_systems"
DISCORD_ID_DB_COLLECTION_NAME = "discord_bot_ids"
# --- System class ---
class SystemDbController():
@@ -189,3 +190,160 @@ class SystemDbController():
except Exception as e:
print(f"Delete failed: {e}")
return None
# --- DiscordIdDbController class ---
class DiscordIdDbController():
def __init__(self):
# Init the handler for Discord IDs
self.db_h = MongoHandler(DB_NAME, DISCORD_ID_DB_COLLECTION_NAME, MONGO_URL)
async def create_discord_id(self, discord_id_data: Dict[str, Any]) -> Optional[DiscordId]:
"""
Creates a new Discord ID entry in the database.
Args:
discord_id_data: A dictionary containing the data for the new Discord ID.
Returns:
The created DiscordId object if successful, None otherwise.
"""
print("\n--- Creating a Discord ID document ---")
try:
if not discord_id_data.get("_id"):
discord_id_data['_id'] = str(uuid4()) # Ensure _id is a string
inserted_id = None
async with self.db_h as db:
insert_result = await self.db_h.insert_one(discord_id_data)
inserted_id = insert_result.inserted_id
if inserted_id:
print(f"Discord ID insert successful with ID: {inserted_id}")
query = {"_id": inserted_id}
inserted_doc = None
async with self.db_h as db:
inserted_doc = await db.find_one(query)
if inserted_doc:
return DiscordId.from_dict(inserted_doc)
else:
print("Discord ID insert acknowledged but no ID returned.")
return None
except Exception as e:
print(f"Discord ID create failed: {e}")
return None
async def find_discord_id(self, query: Dict[str, Any], active_only: bool = False) -> Optional[DiscordId]:
"""
Finds a single Discord ID entry in the database.
Args:
query: A dictionary representing the query criteria.
active_only: If True, only returns active Discord IDs.
Returns:
A DiscordId object if found, None otherwise.
"""
print("\n--- Finding one Discord ID document ---")
try:
if active_only:
query["active"] = True
found_doc = None
async with self.db_h as db:
found_doc = await db.find_one(query)
if found_doc:
print("Found Discord ID document (raw dict):", found_doc)
return DiscordId.from_dict(found_doc)
else:
print("Discord ID document not found.")
return None
except Exception as e:
print(f"Discord ID find failed: {e}")
return None
async def find_discord_ids(self, query: Dict[str, Any] = {}, guild_id: Optional[str] = None, active_only: bool = False) -> Optional[List[DiscordId]]:
"""
Finds one or more Discord ID entries in the database.
Args:
query: A dictionary representing the query criteria.
guild_id: Optional. If provided, filters Discord IDs that belong to this guild.
active_only: If True, only returns active Discord IDs.
Returns:
A list of DiscordId object(s) if found, None otherwise.
"""
print("\n--- Finding multiple Discord ID documents ---")
try:
# Add active filter if requested
if active_only:
query["active"] = True
# Add guild_id filter if provided
if guild_id:
query["guild_ids"] = {"$in": [guild_id]}
found_docs = None
async with self.db_h as db:
found_docs = await db.find(query)
if found_docs:
print(f"Found {len(found_docs)} Discord ID documents (raw dicts).")
converted_discord_ids = []
for doc in found_docs:
converted_discord_ids.append(DiscordId.from_dict(doc))
return converted_discord_ids if len(converted_discord_ids) > 0 else None
else:
print("Discord ID documents not found.")
return None
except Exception as e:
print(f"Discord ID find failed: {e}")
return None
async def update_discord_id(self, query: Dict[str, Any], update_data: Dict[str, Any]) -> Optional[int]:
"""
Updates a single Discord ID entry in the database.
Args:
query: A dictionary representing the query criteria to find the document.
update_data: A dictionary representing the update operations (e.g., using $set).
Returns:
The number of modified documents if successful, None otherwise.
"""
print("\n--- Updating a Discord ID document ---")
try:
update_result = None
async with self.db_h as db:
update_result = await db.update_one(query, update_data)
print(f"Discord ID update result: Matched {update_result.matched_count}, Modified {update_result.modified_count}")
return update_result.modified_count
except Exception as e:
print(f"Discord ID update failed: {e}")
return None
async def delete_discord_id(self, query: Dict[str, Any]) -> Optional[int]:
"""
Deletes a single Discord ID entry from the database.
Args:
query: A dictionary representing the query criteria to find the document to delete.
Returns:
The number of deleted documents if successful, None otherwise.
"""
print("\n--- Deleting a Discord ID document ---")
try:
delete_result = None
async with self.db_h as db:
delete_result = await self.db_h.delete_one(query)
print(f"Discord ID delete result: Deleted count {delete_result.deleted_count}")
return delete_result.deleted_count
except Exception as e:
print(f"Discord ID delete failed: {e}")
return None

View File

@@ -1,11 +1,24 @@
from typing import Optional, List, Dict, Any
from enum import Enum
class ActiveClient:
"""
The active client model in memory for quicker access
"""
websocket = None
active_token: str = None
class DemodTypes(str, Enum):
P25 = "P25"
DMR = "DMR"
ANALOG = "NBFM"
class NodeCommands(str, Enum):
JOIN = "join_server"
LEAVE = "leave_server"
class TalkgroupTag:
"""Represents a talkgroup tag."""
def __init__(self, talkgroup: str, tagDec: int):
@@ -16,6 +29,71 @@ class TalkgroupTag:
def to_dict(self) -> Dict[str, Any]:
return {"talkgroup": self.talkgroup, "tagDec": self.tagDec}
class DiscordId:
"""
A data model for a Discord ID entry.
"""
def __init__(self,
_id: str,
discord_id: str,
name: str,
token: str,
active: bool,
guild_ids: List[str]):
"""
Initializes a DiscordId object.
Args:
_id: A unique identifier for the entry (e.g., MongoDB ObjectId string).
discord_id: The Discord user ID.
name: The name associated with the Discord ID.
token: The authentication token.
active: Boolean indicating if the ID is active.
guild_ids: A list of guild IDs the Discord user is part of.
"""
self._id: str = str(_id)
self.discord_id: str = discord_id
self.name: str = name
self.token: str = token
self.active: bool = active
self.guild_ids: List[str] = guild_ids
def __repr__(self) -> str:
"""
Provides a developer-friendly string representation of the object.
"""
return (f"DiscordId(_id='{self._id}', discord_id='{self.discord_id}', name='{self.name}', "
f"token='{self.token}', active={self.active}, guild_ids={self.guild_ids})")
def to_dict(self) -> Dict[str, Any]:
"""
Converts the DiscordId object to a dictionary suitable for MongoDB.
"""
return {
"_id": self._id,
"discord_id": self.discord_id,
"name": self.name,
"token": self.token,
"active": self.active,
"guild_ids": self.guild_ids,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "DiscordId":
"""
Creates a DiscordId object from a dictionary (e.g., from MongoDB).
"""
return cls(
_id=data.get("_id"),
discord_id=data.get("discord_id", ""),
name=data.get("name", ""),
token=data.get("token", ""),
active=data.get("active", False), # Default to False if not present
guild_ids=data.get("guild_ids", []), # Default to empty list if not present
)
class System:
"""
A basic data model for a channel/system entry in a radio system.

84
app/routers/bot.py Normal file
View File

@@ -0,0 +1,84 @@
import json
from quart import Blueprint, jsonify, request, abort, current_app
from werkzeug.exceptions import HTTPException
from internal.types import ActiveClient
from internal.db_wrappers import DiscordIdDbController
bot_bp = Blueprint('bot', __name__)
@bot_bp.route("/", methods=['GET'])
async def get_online_bots_route():
"""API endpoint to list bots (by name) currently online."""
return jsonify(list(current_app.active_clients.keys()))
@bot_bp.route('/request_token', methods=['POST'])
async def request_token_route():
"""
API endpoint to request a token for a client.
Expects 'client_id' in the JSON request body.
"""
try:
request_data = await request.get_json()
if not request_data or 'client_id' not in request_data:
abort(400, "Missing 'client_id' in request body.")
client_id = request_data['client_id']
print(f"Request received for client_id: {client_id}")
# get the available IDs
active_d_ids = await current_app.d_id_db_h.find_discord_ids(active_only=True)
# init available IDs list
avail_ids = []
# Init the selected ID
selected_id = None
# Check which IDs are currently in use by other bots
for d_id in active_d_ids:
if not find_token_in_active_clients(d_id.token):
avail_ids.append(d_id)
if not avail_ids:
abort(404, "No available active Discord IDs found.")
# --- Logic for selecting a preferred ID based on client_id (TODO) ---
selected_id = avail_ids[0]
# --- End of logic for selecting a preferred ID ---
return jsonify(selected_id.to_dict())
except Exception as e:
print(f"Error in request_token_route: {e}")
abort(500, f"An internal error occurred: {e}")
def find_token_in_active_clients(target_token: str) -> bool:
"""
Checks if a target_token exists in the active_token of any ActiveClient object in a list.
Args:
clients: A list of ActiveClient objects.
target_token: The token string to search for.
Returns:
True if the token is found in any ActiveClient, False otherwise.
"""
for client in current_app.active_clients:
if client.active_token == target_token:
return True
return False
# query_params = dict(request.args)
# systems = await current_app.sys_db_h.find_systems(query_params)

View File

@@ -2,17 +2,14 @@ import json
from quart import Blueprint, jsonify, request, abort, current_app
from werkzeug.exceptions import HTTPException
from enum import Enum
from internal.types import ActiveClient, NodeCommands
nodes_bp = Blueprint('nodes', __name__)
class NodeCommands(str, Enum):
JOIN = "join_server"
LEAVE = "leave_server"
async def register_client(websocket, client_id):
"""Registers a new client connection."""
current_app.active_clients[client_id] = websocket
current_app.active_clients[client_id] = ActiveClient(websocket)
print(f"Client {client_id} connected.")
@@ -26,7 +23,7 @@ async def unregister_client(client_id):
async def send_command_to_client(client_id, command_name, *args):
"""Sends a command to a specific client."""
if client_id in current_app.active_clients:
websocket = current_app.active_clients[client_id]
websocket = current_app.active_clients[client_id].websocket
message = json.dumps({"type": "command", "name": command_name, "args": args})
try:
await websocket.send(message)
@@ -52,7 +49,6 @@ async def send_command_to_all_clients(command_name, *args):
await unregister_client(client_id)
@nodes_bp.route("/", methods=['GET'])
async def get_nodes():
"""API endpoint to list currently connected client IDs."""

View File

@@ -1,10 +1,8 @@
from quart import Blueprint, jsonify, request, abort
from internal.db_wrappers import SystemDbController
from quart import Blueprint, jsonify, request, abort, current_app
from werkzeug.exceptions import HTTPException
from internal.types import System
systems_bp = Blueprint('systems', __name__)
db_h = SystemDbController()
@systems_bp.route("/", methods=['POST'])
@@ -19,24 +17,24 @@ async def create_system_route():
abort(400, "Request body must be JSON") # Bad Request
if '_id' in request_data:
id_search_result = await db_h.find_system({"_id": request_data["_id"]})
id_search_result = await current_app.sys_db_h.find_system({"_id": request_data["_id"]})
if id_search_result:
# If _id is provided and exists, return conflict
abort(409, f"System with ID '{request_data['_id']}' already exists")
# Check if name exists (optional, depending on requirements)
if 'name' in request_data:
name_search_result = await db_h.find_system({"name": request_data["name"]})
name_search_result = await current_app.sys_db_h.find_system({"name": request_data["name"]})
if name_search_result:
abort(409, f"System with name '{request_data['name']}' already exists")
# Check if frequency_khz exists (optional, depending on requirements)
if 'frequency_khz' in request_data:
freq_search_result = await db_h.find_system({"frequency_khz": request_data["frequency_khz"]})
freq_search_result = await current_app.sys_db_h.find_system({"frequency_khz": request_data["frequency_khz"]})
if freq_search_result:
abort(409, f"System with frequency '{request_data['frequency_khz']}' already exists")
created_system = await db_h.create_system(request_data)
created_system = await current_app.sys_db_h.create_system(request_data)
if created_system:
print("Created new system:", created_system)
@@ -56,7 +54,7 @@ async def list_systems_route():
"""API endpoint to get a list of all systems."""
print("\n--- Handling GET /systems ---")
try:
all_systems = await db_h.find_all_systems()
all_systems = await current_app.sys_db_h.find_all_systems()
return jsonify([system.to_dict() for system in all_systems]), 200 # 200 OK status code
except HTTPException:
@@ -72,7 +70,7 @@ async def get_system_route(system_id: str):
print(f"\n--- Handling GET /systems/{system_id} ---")
try:
# Fix the query dictionary syntax
system = await db_h.find_system({'_id': system_id})
system = await current_app.sys_db_h.find_system({'_id': system_id})
if system:
# system is a System object, jsonify will convert it
@@ -93,7 +91,7 @@ async def get_system_by_client_route(client_id: str):
print(f"\n--- Handling GET /systems/client/{client_id} ---")
try:
# Fix the query dictionary syntax
systems = await db_h.find_systems({'avail_on_nodes': client_id})
systems = await current_app.sys_db_h.find_systems({'avail_on_nodes': client_id})
if systems:
# system is a System object, jsonify will convert it
@@ -111,7 +109,7 @@ async def get_system_by_client_route(client_id: str):
@systems_bp.route('/<string:system_id>', methods=['PUT'])
async def update_system_route(system_id: str, updated_system_data):
try:
update_system = await db_h.update_system({"_id", system_id}, updated_system_data)
update_system = await current_app.sys_db_h.update_system({"_id", system_id}, updated_system_data)
if update_system:
print("Updated system:", update_system)
@@ -130,7 +128,7 @@ async def update_system_route(system_id: str, updated_system_data):
@systems_bp.route('/<string:system_id>', methods=['DELETE'])
async def delete_system_route(system_id: str):
try:
deleted_system = await db_h.delete_system({"_id", system_id})
deleted_system = await current_app.sys_db_h.delete_system({"_id", system_id})
if deleted_system:
print("Deleted system:", deleted_system)
@@ -166,7 +164,7 @@ async def assign_client_to_system_route(system_id: str):
abort(400, "'client_id' must be a non-empty string")
# First, check if the system exists
existing_system = await db_h.find_system({"_id": system_id})
existing_system = await current_app.sys_db_h.find_system({"_id": system_id})
if existing_system is None:
abort(404, f"System with ID '{system_id}' not found")
@@ -175,7 +173,7 @@ async def assign_client_to_system_route(system_id: str):
update_query = {"_id": system_id}
update_data = {"$addToSet": {"avail_on_nodes": client_id}}
update_result = await db_h.update_system(update_query, update_data)
update_result = await current_app.sys_db_h.update_system(update_query, update_data)
if update_result > 0:
print(f"Client '{client_id}' assigned to system '{system_id}'.")
@@ -184,7 +182,7 @@ async def assign_client_to_system_route(system_id: str):
print(f"Client '{client_id}' was already assigned to system '{system_id}'.")
status = "already_assigned"
updated_system = await db_h.find_system({"_id": system_id})
updated_system = await current_app.sys_db_h.find_system({"_id": system_id})
if updated_system:
return jsonify({
"status": status,
@@ -222,7 +220,7 @@ async def dismiss_client_from_system_route(system_id: str):
abort(400, "'client_id' must be a non-empty string")
# First, check if the system exists
existing_system = await db_h.find_system({"_id": system_id})
existing_system = await current_app.sys_db_h.find_system({"_id": system_id})
if existing_system is None:
abort(404, f"System with ID '{system_id}' not found")
@@ -231,7 +229,7 @@ async def dismiss_client_from_system_route(system_id: str):
update_query = {"_id": system_id}
update_data = {"$pull": {"avail_on_nodes": client_id}}
update_result = await db_h.update_system(update_query, update_data)
update_result = await current_app.sys_db_h.update_system(update_query, update_data)
if update_result > 0:
print(f"Client '{client_id}' dismissed from system '{system_id}'.")
@@ -242,7 +240,7 @@ async def dismiss_client_from_system_route(system_id: str):
# Note: update_result.matched_count will be 1 even if modified_count is 0
# Optionally fetch the updated document to return its current state
updated_system = await db_h.find_system({"_id": system_id})
updated_system = await current_app.sys_db_h.find_system({"_id": system_id})
if updated_system:
return jsonify({
"status": status,
@@ -271,7 +269,9 @@ async def search_systems_route():
try:
query_params = dict(request.args)
systems = await db_h.find_systems(query_params)
systems = await current_app.sys_db_h.find_systems(query_params)
print("Found systems", systems)
if systems:
# If systems are found, return them as a list of dictionaries

View File

@@ -5,6 +5,8 @@ import uuid
from quart import Quart, jsonify, request
from routers.systems import systems_bp
from routers.nodes import nodes_bp, register_client, unregister_client
from routers.bot import bot_bp
from internal.db_wrappers import SystemDbController, DiscordIdDbController
# --- WebSocket Server Components ---
# Dictionary to store active clients: {client_id: websocket}
@@ -52,6 +54,10 @@ websocket_server_instance = None
# Make active_clients accessible via the app instance.
app.active_clients = active_clients
# Create and attach the DB wrappers
app.sys_db_h = SystemDbController()
app.d_id_db_h = DiscordIdDbController()
@app.before_serving
async def startup_websocket_server():
"""Starts the WebSocket server when the Quart app starts."""
@@ -79,19 +85,12 @@ async def shutdown_websocket_server():
app.register_blueprint(systems_bp, url_prefix="/systems")
app.register_blueprint(nodes_bp, url_prefix="/nodes")
app.register_blueprint(bot_bp, url_prefix="/bots")
@app.route('/')
async def index():
return "Welcome to the Radio App Server API!"
@app.route('/request_token', methods=['POST'])
async def request_token():
"""API endpoint to list currently connected client IDs."""
# TODO - Add DB logic
return jsonify({
"token": "MTE5NjAwNTM2ODYzNjExMjk3Nw.GuCMXg.24iNNofNNumq46FIj68zMe9RmQgugAgfrvelEA"
})
# --- Main Execution ---
if __name__ == "__main__":
# Quart's app.run() will start the asyncio event loop and manage it.