Implemented staus check and refactored node endpoints

This commit is contained in:
Logan Cusano
2025-05-25 21:41:56 -04:00
parent 5e6ee765d8
commit 5b90ebb8f1
3 changed files with 132 additions and 39 deletions

View File

@@ -11,6 +11,7 @@ class DemodTypes(str, Enum):
class NodeCommands(str, Enum):
JOIN = "join_server"
LEAVE = "leave_server"
STATUS = "status"
class TalkgroupTag:

View File

@@ -7,12 +7,6 @@ from internal.db_wrappers import DiscordIdDbController
bot_bp = Blueprint('bot', __name__)
@bot_bp.route("/", methods=['GET'])
async def get_online_bots_route():
"""API endpoint to list bots (by name) currently online."""
return jsonify(list(current_app.active_clients.keys()))
# ------- Discord Token Functions
@bot_bp.route('/request_token', methods=['POST'])
async def request_token_route():
@@ -73,7 +67,6 @@ async def get_all_discord_tokens():
abort(500, f"An internal error occurred: {e}")
# ------- Util Functions
def find_token_in_active_clients(target_token: str) -> bool:

View File

@@ -1,38 +1,118 @@
import json
import asyncio # Import asyncio
import websockets
from quart import Blueprint, jsonify, request, abort, current_app
from werkzeug.exceptions import HTTPException
from enum import Enum
from internal.types import ActiveClient, NodeCommands
import uuid # Import uuid for generating unique request IDs
nodes_bp = Blueprint('nodes', __name__)
# Dictionary to store pending requests: {request_id: asyncio.Future}
pending_requests = {}
async def register_client(websocket, client_id):
"""Registers a new client connection."""
current_app.active_clients[client_id] = ActiveClient(websocket)
print(f"Client {client_id} connected.")
# Start a task to listen for messages from this client
asyncio.create_task(listen_to_client(websocket, client_id))
async def unregister_client(client_id):
"""Unregisters a disconnected client."""
if client_id in current_app.active_clients:
# Also clean up any pending requests for this client
for req_id, req_obj in list(pending_requests.items()):
if req_obj['client_id'] == client_id:
if not req_obj['future'].done():
req_obj['future'].cancel()
del current_app.active_clients[client_id]
print(f"Client {client_id} disconnected.")
async def send_command_to_client(client_id, command_name, *args):
"""Sends a command to a specific client."""
if client_id in current_app.active_clients:
websocket = current_app.active_clients[client_id].websocket
message = json.dumps({"type": "command", "name": command_name, "args": args})
try:
await websocket.send(message)
print(f"Sent command '{command_name}' to client {client_id}")
except websockets.exceptions.ConnectionClosedError:
print(f"Failed to send to client {client_id}: connection closed.")
await unregister_client(client_id)
else:
async def listen_to_client(websocket, client_id):
"""Listens for messages from a specific client."""
try:
while True:
message = await websocket.recv()
data = json.loads(message)
message_type = data.get("type")
request_id = data.get("request_id")
if message_type == "response" and request_id in pending_requests:
future = pending_requests.pop(request_id)
if not future.done(): # Ensure the future hasn't been cancelled or set by something else
future.set_result(data.get("payload"))
# Add other message types handling here if needed (e.g., unsolicited messages)
except websockets.exceptions.ConnectionClosedError:
print(f"Client {client_id} connection closed while listening.")
except json.JSONDecodeError:
print(f"Received invalid JSON from client {client_id}.")
except Exception as e:
print(f"Error listening to client {client_id}: {e}")
finally:
await unregister_client(client_id)
async def send_command_to_client(client_id, command_name, *args, wait_for_response=False, timeout=10):
"""Sends a command to a specific client and optionally waits for a response."""
if client_id not in current_app.active_clients:
print(f"Client {client_id} not found.")
raise ValueError(f"Client {client_id} not found.")
websocket = current_app.active_clients[client_id].websocket
request_id = str(uuid.uuid4()) if wait_for_response else None
message_payload = {"type": "command", "name": command_name, "args": args}
if request_id:
message_payload["request_id"] = request_id
message = json.dumps(message_payload)
future = None
if wait_for_response:
future = asyncio.Future()
pending_requests[request_id] = {
"future": future,
"client_id": client_id
}
try:
await websocket.send(message)
print(f"Sent command '{command_name}' to client {client_id} (Request ID: {request_id})")
if wait_for_response:
try:
response = await asyncio.wait_for(future, timeout)
print(f"Received response for Request ID {request_id} from client {client_id}")
return response
except asyncio.TimeoutError:
print(f"Timeout waiting for response from client {client_id} for Request ID {request_id}")
raise TimeoutError("Client response timed out.")
except asyncio.CancelledError:
print(f"Waiting for response for Request ID {request_id} was cancelled.")
raise ConnectionClosedError("Client disconnected before response was received.")
finally:
# Clean up the future if it's still there
pending_requests.pop(request_id, None)
except websockets.exceptions.ConnectionClosedError:
print(f"Failed to send to client {client_id}: connection closed.")
# If the connection closed, and we were waiting for a response, mark the future as cancelled
if future and not future.done():
future.cancel()
await unregister_client(client_id)
raise
except Exception as e:
print(f"An error occurred sending command to client {client_id}: {e}")
if future and not future.done():
future.cancel()
raise
async def send_command_to_all_clients(command_name, *args):
@@ -40,9 +120,9 @@ async def send_command_to_all_clients(command_name, *args):
message = json.dumps({"type": "command", "name": command_name, "args": args})
# Use a list of items to avoid issues if clients disconnect during iteration
clients_to_send = list(current_app.active_clients.items())
for client_id, websocket in clients_to_send:
for client_id, active_client in clients_to_send:
try:
await websocket.send(message)
await active_client.websocket.send(message)
print(f"Sent command '{command_name}' to client {client_id}")
except websockets.exceptions.ConnectionClosedError:
print(f"Failed to send to client {client_id}: connection closed.")
@@ -55,14 +135,44 @@ async def get_nodes():
return jsonify(list(current_app.active_clients.keys()))
@nodes_bp.route("/join", methods=['POST'])
async def join():
@nodes_bp.route("/online", methods=['GET'])
async def get_online_bots():
active_bots = []
for client_id, active_client in current_app.active_clients.items():
if active_client.active_token:
active_bots.append({client_id: active_client.active_token.to_dict()})
return jsonify(active_bots)
@nodes_bp.route("/<client_id>/status", methods=["GET"])
async def status(client_id):
"""
Get the status from a given client
"""
# Check to make sure the client is online
if client_id not in current_app.active_clients:
return jsonify({"error": f"Client {client_id} not found, it might be offline"}), 404
try:
# Send the command and wait for a response
status_data = await send_command_to_client(client_id, NodeCommands.STATUS, wait_for_response=True, timeout=5)
return jsonify({"status": "success", "client_id": client_id, "data": status_data}), 200
except TimeoutError:
return jsonify({"error": f"Client {client_id} did not respond within the timeout period."}), 504
except ConnectionClosedError:
return jsonify({"error": f"Client {client_id} disconnected before status could be retrieved."}), 503
except Exception as e:
return jsonify({"error": f"Failed to get status from client: {e}"}), 500
@nodes_bp.route("/<client_id>/join", methods=['POST'])
async def join(client_id):
"""
Send a join command to the specific system specified
"""
data = await request.get_json()
client_id = data.get("client_id")
system_id = data.get("system_id")
guild_id = data.get("guild_id")
channel_id = data.get("channel_id")
@@ -86,14 +196,13 @@ async def join():
return jsonify({"error": f"Failed to send command: {e}"}), 500
@nodes_bp.route("/leave", methods=['POST'])
async def leave():
@nodes_bp.route("/<client_id>/leave", methods=['POST'])
async def leave(client_id):
"""
Send a join command to the specific system specified
Send a leave command to the specific system specified
"""
data = await request.get_json()
client_id = data.get("client_id")
guild_id = data.get("guild_id")
# Check to make sure the client is online
@@ -112,14 +221,4 @@ async def leave():
return jsonify({"status": "command sent", "client_id": client_id, "command": NodeCommands.LEAVE}), 200
except Exception as e:
return jsonify({"error": f"Failed to send command: {e}"}), 500
@nodes_bp.route("/get_online_bots", methods=['GET'])
async def get_online_bots():
active_bots = []
for client in current_app.active_clients:
if current_app.active_clients[client].active_token:
active_bots.append({client: current_app.active_clients[client].active_token.to_dict()})
return jsonify(active_bots)
return jsonify({"error": f"Failed to send command: {e}"}), 500