import asyncio import websockets import json import uuid import os from drb_cdb_api import DRBCDBAPI from drb_cdb_types import ConfigGenerator, TalkgroupTag from server_api import RadioAPIClient from enum import Enum from config import Config from utils import generate_node_nickname app_conf = Config() # --- Client Configuration --- SERVER_WS_URI = app_conf.get("SERVER_WS_URI", "ws://localhost:8765") SERVER_API_URL = app_conf.get("SERVER_API_URL", "http://localhost:5000") CLIENT_API_URL = app_conf.get("CLIENT_API_URL", "http://localhost:8001") # Get/set the ID of this node if not app_conf.get("client_id"): app_conf.set("client_id", f"client-{uuid.uuid4().hex[:8]}") CLIENT_ID = app_conf.client_id # Get the nickname or set the reg ID if not app_conf.get("nickname"): generated_nickname = generate_node_nickname() print("Generated nickname: ", generated_nickname) app_conf.set("nickname", generated_nickname) NICKNAME = app_conf.get("nickname") print(app_conf) # ---------------------------- # Dictionary mapping command names (strings) to local client functions command_handlers = {} # Init DRB API handler drb_api = DRBCDBAPI(CLIENT_API_URL) srv_api = RadioAPIClient(SERVER_API_URL) # Hold the active token bot_token = None # --- Define the client status object --- class DiscordStatusValues(str, Enum): INVOICE = "in_voice" # The discord client is in at least one voice channel ONLINE = "online" # The discord client is online OFFLINE = "offline" # The discord client is offline class OP25StatusValues(str, Enum): LISTENING = "listening" # OP25 is online and listening ONLINE = "online" # OP25 is online OFFLINE = "offline" # OP25 is offline client_status = { "op25_status": OP25StatusValues.OFFLINE, "discord_status": DiscordStatusValues.OFFLINE } # --- Define decorator creation function --- def command(func): """Decorator to register a function as a command handler.""" command_handlers[func.__name__] = func return func # --- Define Client-Side Command Handlers (The "API" functions) --- # Join server @command async def join_server(websocket, system_id, guild_id, channel_id): # Takes guild ID, channel ID, and optionally system_id. If system ID is not included then it will skip OP25 logic bot_status = await drb_api.get_bot_status() print("Bot status:", bot_status) # Check if the bot is running if 'bot_running' not in bot_status or not bot_status['bot_running']: # Get a token global bot_token bot_token = await srv_api.request_token() print("Bot token:", bot_token) if not bot_token or "token" not in bot_token or not bot_token['token']: raise Exception("No bot token received") # TODO - Handle this better # Run the bot if not await drb_api.start_bot(bot_token['token']) # Check if the bot is connected to the guild, if not join if 'connected_guilds' not in bot_status or int(guild_id) not in bot_status['connected_guilds']: # Join the server await drb_api.join_voice_channel(guild_id, channel_id) # Update status client_status['discord_status'] = DiscordStatusValues.INVOICE print("Join server completed") # If there is no system ID, skip the OP25 starting / setting logic if not system_id: return op25_status = await drb_api.get_op25_status() print("OP25 status:", op25_status) # Check if OP25 is stopped, if so set the selected system, otherwise do nothing if 'status' not in op25_status or op25_status['status'] == "stopped": sys_details = await srv_api.get_system_details(system_id) print("System details:", sys_details) if not sys_details: # TODO - handle not having channel details return # Generate the config for the channel requested tags_list = [TalkgroupTag(**tag_dict) for tag_dict in sys_details.get('tags', []) if tag_dict] if sys_details.get('tags') is not None else None sys_config = ConfigGenerator( type=sys_details.get('type'), systemName=sys_details['name'], channels=sys_details.get('frequencies'), tags=tags_list, whitelist=sys_details.get('tag_whitelist') ) # Set the OP25 config await drb_api.generate_op25_config(sys_config) # Start OP25 await drb_api.start_op25() client_status['op25_status'] = OP25StatusValues.LISTENING print("OP25 Startup Complete") # Leave server @command async def leave_server(websocket, guild_id): # Takes guild ID bot_status = await drb_api.get_bot_status() print("Bot status:", bot_status) # Check if the bot is running if 'bot_running' not in bot_status or not bot_status['bot_running']: # If not, do nothing return # Check if the bot is in the guild if not "connected_guilds" in bot_status or int(guild_id) not in bot_status['connected_guilds']: return # Leave the server specified await drb_api.leave_voice_channel(guild_id) # Update status client_status['discord_status'] = DiscordStatusValues.ONLINE print("Leave server completed") # Get the client status @command async def get_status(websocket, request_id): # Get the OP25 Status op25_status = await drb_api.get_op25_status() if 'status' not in op25_status or op25_status['status'] == "stopped": client_status['op25_status'] = OP25StatusValues.OFFLINE else: client_status['op25_status'] = OP25StatusValues.LISTENING # Get the discord Status discord_status = await drb_api.get_bot_status() if 'bot_running' not in discord_status or not discord_status['bot_running']: client_status['discord_status'] = DiscordStatusValues.OFFLINE elif discord_status['bot_running'] and ('connected_guilds' in discord_status and len(discord_status['connected_guilds']) > 0): client_status['discord_status'] = DiscordStatusValues.INVOICE else: client_status['discord_status'] = DiscordStatusValues.ONLINE # Check if the active token was passed and update the global if "active_token" in discord_status and discord_status['active_token']: # Update the bot token global bot_token bot_token = discord_status['active_token'] # Return the status object response_payload = {"status": client_status} # Corrected line: Convert the dictionary to a JSON string before sending await websocket.send(json.dumps({"type":"response", "request_id": request_id, "payload": response_payload})) # Start OP25 @command async def op25_start(websocket): await drb_api.start_op25() client_status['op25_status'] = OP25StatusValues.LISTENING # Stop OP25 @command async def op25_stop(websocket): await drb_api.stop_op25() client_status['op25_status'] = OP25StatusValues.OFFLINE # Set OP25 Config @command async def op25_set(websocket, system_id): system_config = await srv_api.get_system_details(system_id) temp_config = ConfigGenerator( type=system_config['type'], systemName=system_config['name'], channels=system_config['frequencies'], tags=system_config['tags'], whitelist=system_config['whitelist'] ) await drb_api.generate_op25_config(temp_config) # Example command @command async def run_task(websocket, task_id, duration_seconds): """Example command: Simulates running a task.""" print(f"\n--- Server Command: run_task ---") print(f"Starting task {task_id} for {duration_seconds} seconds...") await asyncio.sleep(duration_seconds) print(f"Task {task_id} finished.") print("------------------------------") # ------------------------------------------------------------------ async def receive_commands(websocket): """Listens for and processes command messages from the server.""" async for message in websocket: try: data = json.loads(message) if data.get("type") == "command": command_name = data.get("name") args = data.get("args", []) # Check if there is a req ID, if so add it to the args req_id = data.get("request_id", None) if req_id: args.append(req_id) if command_name in command_handlers: print(f"Executing command: {command_name} with args {args}") # Execute the registered async function await command_handlers[command_name](websocket, *args) else: print(f"Received unknown command: {command_name}") elif data.get("type") == "handshake_ack": # Set the session token app_conf.set("access_token", data.get("access_token")) print(f"Server acknowledged handshake.") else: print(f"Received unknown message type: {data.get('type')}") except json.JSONDecodeError: print(f"Received invalid JSON: {message}") except Exception as e: print(f"Error processing message: {e}") async def main_client(): """Connects to the server and handles communication with a robust retry system.""" retry_delay = 5 # seconds # Get initial status from the discord bot discord_status = await drb_api.get_bot_status() if "active_token" in discord_status and discord_status['active_token']: # If there's a status and the active token is set, update the global var global bot_token bot_token = discord_status['active_token'] while True: print(f"Client {CLIENT_ID} attempting to connect to {SERVER_WS_URI}...") try: async with websockets.connect(SERVER_WS_URI) as websocket: print("Connection established.") # Handshake: Send client ID immediately after connecting handshake_message = json.dumps({"type": "handshake", "id": CLIENT_ID, "nickname": NICKNAME, "active_token": bot_token}) await websocket.send(handshake_message) print(f"Sent handshake with ID: {CLIENT_ID}") # Start receiving commands and keep the connection alive await receive_commands(websocket) except (ConnectionRefusedError, websockets.exceptions.ConnectionClosedOK, websockets.exceptions.ConnectionClosedError) as e: print(f"Connection error: {e}. Retrying in {retry_delay} seconds...") await asyncio.sleep(retry_delay) except Exception as e: print(f"An unexpected error occurred: {e}. Retrying in {retry_delay} seconds...") await asyncio.sleep(retry_delay) if __name__ == "__main__": asyncio.run(main_client())