From 5b90ebb8f199a0d912990061942e3f15d50a326e Mon Sep 17 00:00:00 2001 From: Logan Cusano Date: Sun, 25 May 2025 21:41:56 -0400 Subject: [PATCH] Implemented staus check and refactored node endpoints --- app/internal/types.py | 1 + app/routers/bot.py | 7 -- app/routers/nodes.py | 163 +++++++++++++++++++++++++++++++++--------- 3 files changed, 132 insertions(+), 39 deletions(-) diff --git a/app/internal/types.py b/app/internal/types.py index 16d76e9..d77d4c2 100644 --- a/app/internal/types.py +++ b/app/internal/types.py @@ -11,6 +11,7 @@ class DemodTypes(str, Enum): class NodeCommands(str, Enum): JOIN = "join_server" LEAVE = "leave_server" + STATUS = "status" class TalkgroupTag: diff --git a/app/routers/bot.py b/app/routers/bot.py index ef3b160..2a5acc0 100644 --- a/app/routers/bot.py +++ b/app/routers/bot.py @@ -7,12 +7,6 @@ from internal.db_wrappers import DiscordIdDbController bot_bp = Blueprint('bot', __name__) -@bot_bp.route("/", methods=['GET']) -async def get_online_bots_route(): - """API endpoint to list bots (by name) currently online.""" - return jsonify(list(current_app.active_clients.keys())) - - # ------- Discord Token Functions @bot_bp.route('/request_token', methods=['POST']) async def request_token_route(): @@ -73,7 +67,6 @@ async def get_all_discord_tokens(): abort(500, f"An internal error occurred: {e}") - # ------- Util Functions def find_token_in_active_clients(target_token: str) -> bool: diff --git a/app/routers/nodes.py b/app/routers/nodes.py index f2bd0f8..090b00d 100644 --- a/app/routers/nodes.py +++ b/app/routers/nodes.py @@ -1,38 +1,118 @@ import json +import asyncio # Import asyncio +import websockets from quart import Blueprint, jsonify, request, abort, current_app from werkzeug.exceptions import HTTPException from enum import Enum from internal.types import ActiveClient, NodeCommands +import uuid # Import uuid for generating unique request IDs nodes_bp = Blueprint('nodes', __name__) +# Dictionary to store pending requests: {request_id: asyncio.Future} +pending_requests = {} + async def register_client(websocket, client_id): """Registers a new client connection.""" current_app.active_clients[client_id] = ActiveClient(websocket) print(f"Client {client_id} connected.") + # Start a task to listen for messages from this client + asyncio.create_task(listen_to_client(websocket, client_id)) + async def unregister_client(client_id): """Unregisters a disconnected client.""" if client_id in current_app.active_clients: + # Also clean up any pending requests for this client + for req_id, req_obj in list(pending_requests.items()): + if req_obj['client_id'] == client_id: + if not req_obj['future'].done(): + req_obj['future'].cancel() del current_app.active_clients[client_id] print(f"Client {client_id} disconnected.") -async def send_command_to_client(client_id, command_name, *args): - """Sends a command to a specific client.""" - if client_id in current_app.active_clients: - websocket = current_app.active_clients[client_id].websocket - message = json.dumps({"type": "command", "name": command_name, "args": args}) - try: - await websocket.send(message) - print(f"Sent command '{command_name}' to client {client_id}") - except websockets.exceptions.ConnectionClosedError: - print(f"Failed to send to client {client_id}: connection closed.") - await unregister_client(client_id) - else: +async def listen_to_client(websocket, client_id): + """Listens for messages from a specific client.""" + try: + while True: + message = await websocket.recv() + data = json.loads(message) + message_type = data.get("type") + request_id = data.get("request_id") + + if message_type == "response" and request_id in pending_requests: + future = pending_requests.pop(request_id) + if not future.done(): # Ensure the future hasn't been cancelled or set by something else + future.set_result(data.get("payload")) + # Add other message types handling here if needed (e.g., unsolicited messages) + + except websockets.exceptions.ConnectionClosedError: + print(f"Client {client_id} connection closed while listening.") + except json.JSONDecodeError: + print(f"Received invalid JSON from client {client_id}.") + except Exception as e: + print(f"Error listening to client {client_id}: {e}") + finally: + await unregister_client(client_id) + + +async def send_command_to_client(client_id, command_name, *args, wait_for_response=False, timeout=10): + """Sends a command to a specific client and optionally waits for a response.""" + if client_id not in current_app.active_clients: print(f"Client {client_id} not found.") + raise ValueError(f"Client {client_id} not found.") + + websocket = current_app.active_clients[client_id].websocket + request_id = str(uuid.uuid4()) if wait_for_response else None + + message_payload = {"type": "command", "name": command_name, "args": args} + if request_id: + message_payload["request_id"] = request_id + + message = json.dumps(message_payload) + + future = None + if wait_for_response: + future = asyncio.Future() + pending_requests[request_id] = { + "future": future, + "client_id": client_id + } + + try: + await websocket.send(message) + print(f"Sent command '{command_name}' to client {client_id} (Request ID: {request_id})") + + if wait_for_response: + try: + response = await asyncio.wait_for(future, timeout) + print(f"Received response for Request ID {request_id} from client {client_id}") + return response + except asyncio.TimeoutError: + print(f"Timeout waiting for response from client {client_id} for Request ID {request_id}") + raise TimeoutError("Client response timed out.") + except asyncio.CancelledError: + print(f"Waiting for response for Request ID {request_id} was cancelled.") + raise ConnectionClosedError("Client disconnected before response was received.") + finally: + # Clean up the future if it's still there + pending_requests.pop(request_id, None) + + except websockets.exceptions.ConnectionClosedError: + print(f"Failed to send to client {client_id}: connection closed.") + # If the connection closed, and we were waiting for a response, mark the future as cancelled + if future and not future.done(): + future.cancel() + await unregister_client(client_id) + raise + except Exception as e: + print(f"An error occurred sending command to client {client_id}: {e}") + if future and not future.done(): + future.cancel() + raise async def send_command_to_all_clients(command_name, *args): @@ -40,9 +120,9 @@ async def send_command_to_all_clients(command_name, *args): message = json.dumps({"type": "command", "name": command_name, "args": args}) # Use a list of items to avoid issues if clients disconnect during iteration clients_to_send = list(current_app.active_clients.items()) - for client_id, websocket in clients_to_send: + for client_id, active_client in clients_to_send: try: - await websocket.send(message) + await active_client.websocket.send(message) print(f"Sent command '{command_name}' to client {client_id}") except websockets.exceptions.ConnectionClosedError: print(f"Failed to send to client {client_id}: connection closed.") @@ -55,14 +135,44 @@ async def get_nodes(): return jsonify(list(current_app.active_clients.keys())) -@nodes_bp.route("/join", methods=['POST']) -async def join(): +@nodes_bp.route("/online", methods=['GET']) +async def get_online_bots(): + active_bots = [] + for client_id, active_client in current_app.active_clients.items(): + if active_client.active_token: + active_bots.append({client_id: active_client.active_token.to_dict()}) + return jsonify(active_bots) + + +@nodes_bp.route("//status", methods=["GET"]) +async def status(client_id): + """ + Get the status from a given client + """ + # Check to make sure the client is online + if client_id not in current_app.active_clients: + return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404 + + try: + # Send the command and wait for a response + status_data = await send_command_to_client(client_id, NodeCommands.STATUS, wait_for_response=True, timeout=5) + return jsonify({"status": "success", "client_id": client_id, "data": status_data}), 200 + + except TimeoutError: + return jsonify({"error": f"Client {client_id} did not respond within the timeout period."}), 504 + except ConnectionClosedError: + return jsonify({"error": f"Client {client_id} disconnected before status could be retrieved."}), 503 + except Exception as e: + return jsonify({"error": f"Failed to get status from client: {e}"}), 500 + + +@nodes_bp.route("//join", methods=['POST']) +async def join(client_id): """ Send a join command to the specific system specified """ data = await request.get_json() - client_id = data.get("client_id") system_id = data.get("system_id") guild_id = data.get("guild_id") channel_id = data.get("channel_id") @@ -86,14 +196,13 @@ async def join(): return jsonify({"error": f"Failed to send command: {e}"}), 500 -@nodes_bp.route("/leave", methods=['POST']) -async def leave(): +@nodes_bp.route("//leave", methods=['POST']) +async def leave(client_id): """ - Send a join command to the specific system specified + Send a leave command to the specific system specified """ data = await request.get_json() - client_id = data.get("client_id") guild_id = data.get("guild_id") # Check to make sure the client is online @@ -112,14 +221,4 @@ async def leave(): return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.LEAVE}), 200 except Exception as e: - return jsonify({"error": f"Failed to send command: {e}"}), 500 - - -@nodes_bp.route("/get_online_bots", methods=['GET']) -async def get_online_bots(): - active_bots = [] - for client in current_app.active_clients: - if current_app.active_clients[client].active_token: - active_bots.append({client: current_app.active_clients[client].active_token.to_dict()}) - - return jsonify(active_bots) \ No newline at end of file + return jsonify({"error": f"Failed to send command: {e}"}), 500 \ No newline at end of file