import json import asyncio # Import asyncio import websockets from quart import Blueprint, jsonify, request, abort, current_app from werkzeug.exceptions import HTTPException from enum import Enum from internal.types import ActiveClient, NodeCommands import uuid # Import uuid for generating unique request IDs nodes_bp = Blueprint('nodes', __name__) # Dictionary to store pending requests: {request_id: asyncio.Future} pending_requests = {} async def register_client(websocket, client_id): """Registers a new client connection.""" current_app.active_clients[client_id] = ActiveClient(websocket) print(f"Client {client_id} connected.") # Start a task to listen for messages from this client asyncio.create_task(listen_to_client(websocket, client_id)) async def unregister_client(client_id): """Unregisters a disconnected client.""" if client_id in current_app.active_clients: # Also clean up any pending requests for this client for req_id, req_obj in list(pending_requests.items()): if req_obj['client_id'] == client_id: if not req_obj['future'].done(): req_obj['future'].cancel() del current_app.active_clients[client_id] print(f"Client {client_id} disconnected.") async def listen_to_client(websocket, client_id): """Listens for messages from a specific client.""" try: while True: message = await websocket.recv() data = json.loads(message) message_type = data.get("type") request_id = data.get("request_id") if message_type == "response" and request_id in pending_requests: # Retrieve the dictionary containing the future and client_id request_info = pending_requests.pop(request_id) # Extract the actual asyncio.Future object future = request_info["future"] if not future.done(): # This will now correctly call .done() on the Future future.set_result(data.get("payload")) # Add other message types handling here if needed (e.g., unsolicited messages) except websockets.exceptions.ConnectionClosedError: print(f"Client {client_id} connection closed while listening.") except json.JSONDecodeError: print(f"Received invalid JSON from client {client_id}.") except Exception as e: print(f"Error listening to client {client_id}: {e}") finally: await unregister_client(client_id) async def send_command_to_client(client_id, command_name, *args, wait_for_response=False, timeout=10): """Sends a command to a specific client and optionally waits for a response.""" if client_id not in current_app.active_clients: print(f"Client {client_id} not found.") raise ValueError(f"Client {client_id} not found.") websocket = current_app.active_clients[client_id].websocket request_id = str(uuid.uuid4()) if wait_for_response else None message_payload = {"type": "command", "name": command_name, "args": args} if request_id: message_payload["request_id"] = request_id message = json.dumps(message_payload) future = None if wait_for_response: future = asyncio.Future() pending_requests[request_id] = { "future": future, "client_id": client_id } try: await websocket.send(message) print(f"Sent command '{command_name}' to client {client_id} (Request ID: {request_id})") if wait_for_response: try: response = await asyncio.wait_for(future, timeout) print(f"Received response for Request ID {request_id} from client {client_id}") return response except asyncio.TimeoutError: print(f"Timeout waiting for response from client {client_id} for Request ID {request_id}") raise TimeoutError("Client response timed out.") except asyncio.CancelledError: print(f"Waiting for response for Request ID {request_id} was cancelled.") raise ConnectionClosedError("Client disconnected before response was received.") finally: # Clean up the future if it's still there pending_requests.pop(request_id, None) except websockets.exceptions.ConnectionClosedError: print(f"Failed to send to client {client_id}: connection closed.") # If the connection closed, and we were waiting for a response, mark the future as cancelled if future and not future.done(): future.cancel() await unregister_client(client_id) raise except Exception as e: print(f"An error occurred sending command to client {client_id}: {e}") if future and not future.done(): future.cancel() raise async def send_command_to_all_clients(command_name, *args): """Sends a command to all connected clients.""" message = json.dumps({"type": "command", "name": command_name, "args": args}) # Use a list of items to avoid issues if clients disconnect during iteration clients_to_send = list(current_app.active_clients.items()) for client_id, active_client in clients_to_send: try: await active_client.websocket.send(message) print(f"Sent command '{command_name}' to client {client_id}") except websockets.exceptions.ConnectionClosedError: print(f"Failed to send to client {client_id}: connection closed.") await unregister_client(client_id) @nodes_bp.route("/", methods=['GET']) async def get_nodes(): """API endpoint to list currently connected client IDs.""" return jsonify(list(current_app.active_clients.keys())) @nodes_bp.route("/online", methods=['GET']) async def get_online_bots(): active_bots = [] for client_id, active_client in current_app.active_clients.items(): if active_client.active_token: active_bots.append({client_id: active_client.active_token.to_dict()}) return jsonify(active_bots) @nodes_bp.route("//status", methods=["GET"]) async def status(client_id): """ Get the status from a given client """ # Check to make sure the client is online if client_id not in current_app.active_clients: return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404 try: # Send the command and wait for a response status_data = await send_command_to_client(client_id, NodeCommands.STATUS, wait_for_response=True, timeout=5) return jsonify(status_data), 200 except TimeoutError: return jsonify({"error": f"Client {client_id} did not respond within the timeout period."}), 504 except ConnectionClosedError: return jsonify({"error": f"Client {client_id} disconnected before status could be retrieved."}), 503 except Exception as e: return jsonify({"error": f"Failed to get status from client: {e}"}), 500 @nodes_bp.route("//join", methods=['POST']) async def join(client_id): """ Send a join command to the specific system specified """ data = await request.get_json() system_id = data.get("system_id", None) guild_id = data.get("guild_id") channel_id = data.get("channel_id") # Check to make sure the client is online if client_id not in current_app.active_clients: return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404 try: args = [system_id, guild_id, channel_id] if not isinstance(args, list): return jsonify({"error": "'args' must be a list"}), 400 # Send the command asynchronously await send_command_to_client(client_id, NodeCommands.JOIN, *args) return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.JOIN}), 200 except Exception as e: return jsonify({"error": f"Failed to send command: {e}"}), 500 @nodes_bp.route("//leave", methods=['POST']) async def leave(client_id): """ Send a leave command to the specific node """ data = await request.get_json() guild_id = data.get("guild_id") # Check to make sure the client is online if client_id not in current_app.active_clients: return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404 try: args = [guild_id] if not isinstance(args, list): return jsonify({"error": "'args' must be a list"}), 400 # Send the command asynchronously await send_command_to_client(client_id, NodeCommands.LEAVE, *args) return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.LEAVE}), 200 except Exception as e: return jsonify({"error": f"Failed to send command: {e}"}), 500 @nodes_bp.route("//op25_start", methods=['POST']) async def op25_start(client_id): """ Send an OP25 start command to the specific node """ # Check to make sure the client is online if client_id not in current_app.active_clients: return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404 try: # Send the command asynchronously await send_command_to_client(client_id, NodeCommands.OP25_START) return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.OP25_START}), 200 except Exception as e: return jsonify({"error": f"Failed to send command: {e}"}), 500 @nodes_bp.route("//op25_stop", methods=['POST']) async def op25_stop(client_id): """ Send an OP25 stop command to the specific node """ # Check to make sure the client is online if client_id not in current_app.active_clients: return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404 try: # Send the command asynchronously await send_command_to_client(client_id, NodeCommands.OP25_STOP) return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.OP25_STOP}), 200 except Exception as e: return jsonify({"error": f"Failed to send command: {e}"}), 500 @nodes_bp.route("//op25_set", methods=['POST']) async def op25_set(client_id): """ Send an OP25 set config command to the specific node """ data = await request.get_json() system_id = data.get("system_id") # Check to make sure the client is online if client_id not in current_app.active_clients: return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404 if not system_id: return jsonify({"error":"No System ID supplied"}), 400 try: args = [system_id] if not isinstance(args, list): return jsonify({"error": "'args' must be a list"}), 400 # Send the command asynchronously await send_command_to_client(client_id, NodeCommands.OP25_SET, *args) return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.OP25_SET}), 200 except Exception as e: return jsonify({"error": f"Failed to send command: {e}"}), 500