Files
drb-core-server/app/routers/nodes.py
2025-05-25 23:36:21 -04:00

298 lines
11 KiB
Python

import json
import asyncio # Import asyncio
import websockets
from quart import Blueprint, jsonify, request, abort, current_app
from werkzeug.exceptions import HTTPException
from enum import Enum
from internal.types import ActiveClient, NodeCommands
import uuid # Import uuid for generating unique request IDs
nodes_bp = Blueprint('nodes', __name__)
# Dictionary to store pending requests: {request_id: asyncio.Future}
pending_requests = {}
async def register_client(websocket, client_id):
"""Registers a new client connection."""
current_app.active_clients[client_id] = ActiveClient(websocket)
print(f"Client {client_id} connected.")
# Start a task to listen for messages from this client
asyncio.create_task(listen_to_client(websocket, client_id))
async def unregister_client(client_id):
"""Unregisters a disconnected client."""
if client_id in current_app.active_clients:
# Also clean up any pending requests for this client
for req_id, req_obj in list(pending_requests.items()):
if req_obj['client_id'] == client_id:
if not req_obj['future'].done():
req_obj['future'].cancel()
del current_app.active_clients[client_id]
print(f"Client {client_id} disconnected.")
async def listen_to_client(websocket, client_id):
"""Listens for messages from a specific client."""
try:
while True:
message = await websocket.recv()
data = json.loads(message)
message_type = data.get("type")
request_id = data.get("request_id")
if message_type == "response" and request_id in pending_requests:
# Retrieve the dictionary containing the future and client_id
request_info = pending_requests.pop(request_id)
# Extract the actual asyncio.Future object
future = request_info["future"]
if not future.done(): # This will now correctly call .done() on the Future
future.set_result(data.get("payload"))
# Add other message types handling here if needed (e.g., unsolicited messages)
except websockets.exceptions.ConnectionClosedError:
print(f"Client {client_id} connection closed while listening.")
except json.JSONDecodeError:
print(f"Received invalid JSON from client {client_id}.")
except Exception as e:
print(f"Error listening to client {client_id}: {e}")
finally:
await unregister_client(client_id)
async def send_command_to_client(client_id, command_name, *args, wait_for_response=False, timeout=10):
"""Sends a command to a specific client and optionally waits for a response."""
if client_id not in current_app.active_clients:
print(f"Client {client_id} not found.")
raise ValueError(f"Client {client_id} not found.")
websocket = current_app.active_clients[client_id].websocket
request_id = str(uuid.uuid4()) if wait_for_response else None
message_payload = {"type": "command", "name": command_name, "args": args}
if request_id:
message_payload["request_id"] = request_id
message = json.dumps(message_payload)
future = None
if wait_for_response:
future = asyncio.Future()
pending_requests[request_id] = {
"future": future,
"client_id": client_id
}
try:
await websocket.send(message)
print(f"Sent command '{command_name}' to client {client_id} (Request ID: {request_id})")
if wait_for_response:
try:
response = await asyncio.wait_for(future, timeout)
print(f"Received response for Request ID {request_id} from client {client_id}")
return response
except asyncio.TimeoutError:
print(f"Timeout waiting for response from client {client_id} for Request ID {request_id}")
raise TimeoutError("Client response timed out.")
except asyncio.CancelledError:
print(f"Waiting for response for Request ID {request_id} was cancelled.")
raise ConnectionClosedError("Client disconnected before response was received.")
finally:
# Clean up the future if it's still there
pending_requests.pop(request_id, None)
except websockets.exceptions.ConnectionClosedError:
print(f"Failed to send to client {client_id}: connection closed.")
# If the connection closed, and we were waiting for a response, mark the future as cancelled
if future and not future.done():
future.cancel()
await unregister_client(client_id)
raise
except Exception as e:
print(f"An error occurred sending command to client {client_id}: {e}")
if future and not future.done():
future.cancel()
raise
async def send_command_to_all_clients(command_name, *args):
"""Sends a command to all connected clients."""
message = json.dumps({"type": "command", "name": command_name, "args": args})
# Use a list of items to avoid issues if clients disconnect during iteration
clients_to_send = list(current_app.active_clients.items())
for client_id, active_client in clients_to_send:
try:
await active_client.websocket.send(message)
print(f"Sent command '{command_name}' to client {client_id}")
except websockets.exceptions.ConnectionClosedError:
print(f"Failed to send to client {client_id}: connection closed.")
await unregister_client(client_id)
@nodes_bp.route("/", methods=['GET'])
async def get_nodes():
"""API endpoint to list currently connected client IDs."""
return jsonify(list(current_app.active_clients.keys()))
@nodes_bp.route("/online", methods=['GET'])
async def get_online_bots():
active_bots = []
for client_id, active_client in current_app.active_clients.items():
if active_client.active_token:
active_bots.append({client_id: active_client.active_token.to_dict()})
return jsonify(active_bots)
@nodes_bp.route("/<client_id>/status", methods=["GET"])
async def status(client_id):
"""
Get the status from a given client
"""
# Check to make sure the client is online
if client_id not in current_app.active_clients:
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
try:
# Send the command and wait for a response
status_data = await send_command_to_client(client_id, NodeCommands.STATUS, wait_for_response=True, timeout=5)
return jsonify(status_data), 200
except TimeoutError:
return jsonify({"error": f"Client {client_id} did not respond within the timeout period."}), 504
except ConnectionClosedError:
return jsonify({"error": f"Client {client_id} disconnected before status could be retrieved."}), 503
except Exception as e:
return jsonify({"error": f"Failed to get status from client: {e}"}), 500
@nodes_bp.route("/<client_id>/join", methods=['POST'])
async def join(client_id):
"""
Send a join command to the specific system specified
"""
data = await request.get_json()
system_id = data.get("system_id", None)
guild_id = data.get("guild_id")
channel_id = data.get("channel_id")
# Check to make sure the client is online
if client_id not in current_app.active_clients:
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
try:
args = [system_id, guild_id, channel_id]
if not isinstance(args, list):
return jsonify({"error": "'args' must be a list"}), 400
# Send the command asynchronously
await send_command_to_client(client_id, NodeCommands.JOIN, *args)
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.JOIN}), 200
except Exception as e:
return jsonify({"error": f"Failed to send command: {e}"}), 500
@nodes_bp.route("/<client_id>/leave", methods=['POST'])
async def leave(client_id):
"""
Send a leave command to the specific node
"""
data = await request.get_json()
guild_id = data.get("guild_id")
# Check to make sure the client is online
if client_id not in current_app.active_clients:
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
try:
args = [guild_id]
if not isinstance(args, list):
return jsonify({"error": "'args' must be a list"}), 400
# Send the command asynchronously
await send_command_to_client(client_id, NodeCommands.LEAVE, *args)
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.LEAVE}), 200
except Exception as e:
return jsonify({"error": f"Failed to send command: {e}"}), 500
@nodes_bp.route("/<client_id>/op25_start", methods=['POST'])
async def op25_start(client_id):
"""
Send an OP25 start command to the specific node
"""
# Check to make sure the client is online
if client_id not in current_app.active_clients:
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
try:
# Send the command asynchronously
await send_command_to_client(client_id, NodeCommands.OP25_START)
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.OP25_START}), 200
except Exception as e:
return jsonify({"error": f"Failed to send command: {e}"}), 500
@nodes_bp.route("/<client_id>/op25_stop", methods=['POST'])
async def op25_stop(client_id):
"""
Send an OP25 stop command to the specific node
"""
# Check to make sure the client is online
if client_id not in current_app.active_clients:
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
try:
# Send the command asynchronously
await send_command_to_client(client_id, NodeCommands.OP25_STOP)
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.OP25_STOP}), 200
except Exception as e:
return jsonify({"error": f"Failed to send command: {e}"}), 500
@nodes_bp.route("/<client_id>/op25_set", methods=['POST'])
async def op25_set(client_id):
"""
Send an OP25 set config command to the specific node
"""
data = await request.get_json()
system_id = data.get("system_id")
# Check to make sure the client is online
if client_id not in current_app.active_clients:
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
if not system_id:
return jsonify({"error":"No System ID supplied"}), 400
try:
args = [system_id]
if not isinstance(args, list):
return jsonify({"error": "'args' must be a list"}), 400
# Send the command asynchronously
await send_command_to_client(client_id, NodeCommands.OP25_SET, *args)
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.OP25_SET}), 200
except Exception as e:
return jsonify({"error": f"Failed to send command: {e}"}), 500