329 lines
12 KiB
Python
329 lines
12 KiB
Python
import json
|
|
import asyncio # Import asyncio
|
|
import websockets
|
|
import uuid
|
|
from quart import Blueprint, jsonify, request, abort, current_app
|
|
from werkzeug.exceptions import HTTPException
|
|
from enum import Enum
|
|
from internal.types import ActiveClient, NodeCommands, UserRoles
|
|
from quart_jwt_extended import create_access_token
|
|
from quart_jwt_extended import jwt_required
|
|
from routers.auth import role_required
|
|
|
|
nodes_bp = Blueprint('nodes', __name__)
|
|
|
|
# Dictionary to store pending requests: {request_id: asyncio.Future}
|
|
pending_requests = {}
|
|
|
|
|
|
async def register_client(websocket, client_id, client_nickname, active_token):
|
|
"""Registers a new client connection."""
|
|
current_app.active_clients[client_id] = ActiveClient()
|
|
current_app.active_clients[client_id].websocket = websocket
|
|
current_app.active_clients[client_id].nickname = client_nickname
|
|
current_app.active_clients[client_id].active_token = active_token
|
|
current_app.active_clients[client_id].client_id = client_id
|
|
print(f"Client {client_id} connected.")
|
|
|
|
# Create a JWT for the client
|
|
current_app.active_clients[client_id].access_token = create_access_token(identity={"id": client_id, "username": client_nickname, "type": "node"}, expires_delta=False)
|
|
|
|
print(current_app.active_clients[client_id])
|
|
|
|
# Start a task to listen for messages from this client
|
|
asyncio.create_task(listen_to_client(websocket, client_id))
|
|
|
|
|
|
async def unregister_client(client_id):
|
|
"""Unregisters a disconnected client."""
|
|
if client_id in current_app.active_clients:
|
|
# Also clean up any pending requests for this client
|
|
for req_id, req_obj in list(pending_requests.items()):
|
|
if req_obj['client_id'] == client_id:
|
|
if not req_obj['future'].done():
|
|
req_obj['future'].cancel()
|
|
del current_app.active_clients[client_id]
|
|
print(f"Client {client_id} disconnected.")
|
|
|
|
|
|
async def listen_to_client(websocket, client_id):
|
|
"""Listens for messages from a specific client."""
|
|
try:
|
|
while True:
|
|
message = await websocket.recv()
|
|
data = json.loads(message)
|
|
message_type = data.get("type")
|
|
request_id = data.get("request_id")
|
|
|
|
if message_type == "response" and request_id in pending_requests:
|
|
# Retrieve the dictionary containing the future and client_id
|
|
request_info = pending_requests.pop(request_id)
|
|
|
|
# Extract the actual asyncio.Future object
|
|
future = request_info["future"]
|
|
|
|
if not future.done(): # This will now correctly call .done() on the Future
|
|
future.set_result(data.get("payload"))
|
|
# Add other message types handling here if needed (e.g., unsolicited messages)
|
|
|
|
except websockets.exceptions.ConnectionClosedError:
|
|
print(f"Client {client_id} connection closed while listening.")
|
|
except json.JSONDecodeError:
|
|
print(f"Received invalid JSON from client {client_id}.")
|
|
except Exception as e:
|
|
print(f"Error listening to client {client_id}: {e}")
|
|
finally:
|
|
await unregister_client(client_id)
|
|
|
|
|
|
async def send_command_to_client(client_id, command_name, *args, wait_for_response=False, timeout=10):
|
|
"""Sends a command to a specific client and optionally waits for a response."""
|
|
if client_id not in current_app.active_clients:
|
|
print(f"Client {client_id} not found.")
|
|
raise ValueError(f"Client {client_id} not found.")
|
|
|
|
websocket = current_app.active_clients[client_id].websocket
|
|
request_id = str(uuid.uuid4()) if wait_for_response else None
|
|
|
|
message_payload = {"type": "command", "name": command_name, "args": args}
|
|
if request_id:
|
|
message_payload["request_id"] = request_id
|
|
|
|
message = json.dumps(message_payload)
|
|
|
|
future = None
|
|
if wait_for_response:
|
|
future = asyncio.Future()
|
|
pending_requests[request_id] = {
|
|
"future": future,
|
|
"client_id": client_id
|
|
}
|
|
|
|
try:
|
|
await websocket.send(message)
|
|
print(f"Sent command '{command_name}' to client {client_id} (Request ID: {request_id})")
|
|
|
|
if wait_for_response:
|
|
try:
|
|
response = await asyncio.wait_for(future, timeout)
|
|
print(f"Received response for Request ID {request_id} from client {client_id}")
|
|
return response
|
|
except asyncio.TimeoutError:
|
|
print(f"Timeout waiting for response from client {client_id} for Request ID {request_id}")
|
|
raise TimeoutError("Client response timed out.")
|
|
except asyncio.CancelledError:
|
|
print(f"Waiting for response for Request ID {request_id} was cancelled.")
|
|
raise ConnectionClosedError("Client disconnected before response was received.")
|
|
finally:
|
|
# Clean up the future if it's still there
|
|
pending_requests.pop(request_id, None)
|
|
|
|
except websockets.exceptions.ConnectionClosedError:
|
|
print(f"Failed to send to client {client_id}: connection closed.")
|
|
# If the connection closed, and we were waiting for a response, mark the future as cancelled
|
|
if future and not future.done():
|
|
future.cancel()
|
|
await unregister_client(client_id)
|
|
raise
|
|
except Exception as e:
|
|
print(f"An error occurred sending command to client {client_id}: {e}")
|
|
if future and not future.done():
|
|
future.cancel()
|
|
raise
|
|
|
|
|
|
async def send_command_to_all_clients(command_name, *args):
|
|
"""Sends a command to all connected clients."""
|
|
message = json.dumps({"type": "command", "name": command_name, "args": args})
|
|
# Use a list of items to avoid issues if clients disconnect during iteration
|
|
clients_to_send = list(current_app.active_clients.items())
|
|
for client_id, active_client in clients_to_send:
|
|
try:
|
|
await active_client.websocket.send(message)
|
|
print(f"Sent command '{command_name}' to client {client_id}")
|
|
except websockets.exceptions.ConnectionClosedError:
|
|
print(f"Failed to send to client {client_id}: connection closed.")
|
|
await unregister_client(client_id)
|
|
|
|
|
|
@nodes_bp.route("/", methods=['GET'])
|
|
@jwt_required
|
|
@role_required(UserRoles.USER)
|
|
async def get_nodes():
|
|
"""API endpoint to list currently connected client IDs."""
|
|
return jsonify([current_app.active_clients[client_id].to_dict() for client_id in current_app.active_clients])
|
|
|
|
|
|
@nodes_bp.route("/online", methods=['GET'])
|
|
@jwt_required
|
|
@role_required(UserRoles.USER)
|
|
async def get_online_bots():
|
|
active_bots = []
|
|
for client_id, active_client in current_app.active_clients.items():
|
|
if active_client.active_token:
|
|
active_bots.append({client_id: active_client.active_token.to_dict()})
|
|
return jsonify(active_bots)
|
|
|
|
|
|
@nodes_bp.route("/<client_id>/status", methods=["GET"])
|
|
@jwt_required
|
|
@role_required(UserRoles.USER)
|
|
async def status(client_id):
|
|
"""
|
|
Get the status from a given client
|
|
"""
|
|
# Check to make sure the client is online
|
|
if client_id not in current_app.active_clients:
|
|
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
|
|
|
|
try:
|
|
# Send the command and wait for a response
|
|
status_data = await send_command_to_client(client_id, NodeCommands.STATUS, wait_for_response=True, timeout=5)
|
|
return jsonify({
|
|
"active_client": current_app.active_clients[client_id].to_dict(),
|
|
"status": status_data['status']
|
|
}), 200
|
|
|
|
except TimeoutError:
|
|
return jsonify({"error": f"Client {client_id} did not respond within the timeout period."}), 504
|
|
except ConnectionClosedError:
|
|
return jsonify({"error": f"Client {client_id} disconnected before status could be retrieved."}), 503
|
|
except Exception as e:
|
|
return jsonify({"error": f"Failed to get status from client: {e}"}), 500
|
|
|
|
|
|
@nodes_bp.route("/<client_id>/join", methods=['POST'])
|
|
@jwt_required
|
|
@role_required(UserRoles.MOD)
|
|
async def join(client_id):
|
|
"""
|
|
Send a join command to the specific system specified
|
|
"""
|
|
data = await request.get_json()
|
|
|
|
system_id = data.get("system_id", None)
|
|
guild_id = data.get("guild_id")
|
|
channel_id = data.get("channel_id")
|
|
|
|
# Check to make sure the client is online
|
|
if client_id not in current_app.active_clients:
|
|
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
|
|
|
|
try:
|
|
args = [system_id, guild_id, channel_id]
|
|
|
|
if not isinstance(args, list):
|
|
return jsonify({"error": "'args' must be a list"}), 400
|
|
|
|
# Send the command asynchronously
|
|
await send_command_to_client(client_id, NodeCommands.JOIN, *args)
|
|
|
|
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.JOIN}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({"error": f"Failed to send command: {e}"}), 500
|
|
|
|
|
|
@nodes_bp.route("/<client_id>/leave", methods=['POST'])
|
|
@jwt_required
|
|
@role_required(UserRoles.MOD)
|
|
async def leave(client_id):
|
|
"""
|
|
Send a leave command to the specific node
|
|
"""
|
|
data = await request.get_json()
|
|
|
|
guild_id = data.get("guild_id")
|
|
|
|
# Check to make sure the client is online
|
|
if client_id not in current_app.active_clients:
|
|
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
|
|
|
|
try:
|
|
args = [guild_id]
|
|
|
|
if not isinstance(args, list):
|
|
return jsonify({"error": "'args' must be a list"}), 400
|
|
|
|
# Send the command asynchronously
|
|
await send_command_to_client(client_id, NodeCommands.LEAVE, *args)
|
|
|
|
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.LEAVE}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({"error": f"Failed to send command: {e}"}), 500
|
|
|
|
|
|
@nodes_bp.route("/<client_id>/op25_start", methods=['POST'])
|
|
@jwt_required
|
|
@role_required(UserRoles.MOD)
|
|
async def op25_start(client_id):
|
|
"""
|
|
Send an OP25 start command to the specific node
|
|
"""
|
|
# Check to make sure the client is online
|
|
if client_id not in current_app.active_clients:
|
|
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
|
|
|
|
try:
|
|
# Send the command asynchronously
|
|
await send_command_to_client(client_id, NodeCommands.OP25_START)
|
|
|
|
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.OP25_START}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({"error": f"Failed to send command: {e}"}), 500
|
|
|
|
|
|
@nodes_bp.route("/<client_id>/op25_stop", methods=['POST'])
|
|
@jwt_required
|
|
@role_required(UserRoles.MOD)
|
|
async def op25_stop(client_id):
|
|
"""
|
|
Send an OP25 stop command to the specific node
|
|
"""
|
|
# Check to make sure the client is online
|
|
if client_id not in current_app.active_clients:
|
|
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
|
|
|
|
try:
|
|
# Send the command asynchronously
|
|
await send_command_to_client(client_id, NodeCommands.OP25_STOP)
|
|
|
|
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.OP25_STOP}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({"error": f"Failed to send command: {e}"}), 500
|
|
|
|
|
|
@nodes_bp.route("/<client_id>/op25_set", methods=['POST'])
|
|
@jwt_required
|
|
@role_required(UserRoles.MOD)
|
|
async def op25_set(client_id):
|
|
"""
|
|
Send an OP25 set config command to the specific node
|
|
"""
|
|
data = await request.get_json()
|
|
|
|
system_id = data.get("system_id")
|
|
|
|
# Check to make sure the client is online
|
|
if client_id not in current_app.active_clients:
|
|
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
|
|
|
|
if not system_id:
|
|
return jsonify({"error":"No System ID supplied"}), 400
|
|
|
|
try:
|
|
args = [system_id]
|
|
|
|
if not isinstance(args, list):
|
|
return jsonify({"error": "'args' must be a list"}), 400
|
|
|
|
# Send the command asynchronously
|
|
await send_command_to_client(client_id, NodeCommands.OP25_SET, *args)
|
|
|
|
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.OP25_SET}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({"error": f"Failed to send command: {e}"}), 500 |