267 lines
9.7 KiB
Python
267 lines
9.7 KiB
Python
import asyncio
|
|
import websockets
|
|
import json
|
|
import uuid
|
|
import os
|
|
from drb_cdb_api import DRBCDBAPI
|
|
from drb_cdb_types import ConfigGenerator, TalkgroupTag
|
|
from server_api import RadioAPIClient
|
|
from enum import Enum
|
|
from config import Config
|
|
|
|
app_conf = Config()
|
|
|
|
# --- Client Configuration ---
|
|
SERVER_WS_URI = os.getenv("SERVER_WS_URI", "ws://localhost:8765")
|
|
SERVER_API_URL = os.getenv("SERVER_API_URL", "http://localhost:5000")
|
|
CLIENT_API_URL = os.getenv("CLIENT_API_URL", "http://localhost:8001")
|
|
|
|
# Get/set the ID of this node
|
|
if not app_conf.get("client_id"):
|
|
app_conf.set("client_id", f"client-{uuid.uuid4().hex[:8]}")
|
|
CLIENT_ID = app_conf.client_id
|
|
|
|
# Get the nickname (even if it's empty)
|
|
NICKNAME = app_conf.get("nickname")
|
|
# ----------------------------
|
|
|
|
# Dictionary mapping command names (strings) to local client functions
|
|
command_handlers = {}
|
|
|
|
# Init DRB API handler
|
|
drb_api = DRBCDBAPI(CLIENT_API_URL)
|
|
srv_api = RadioAPIClient(SERVER_API_URL)
|
|
|
|
# --- Define the client status object ---
|
|
class DiscordStatusValues(str, Enum):
|
|
INVOICE = "in_voice" # The discord client is in at least one voice channel
|
|
ONLINE = "online" # The discord client is online
|
|
OFFLINE = "offline" # The discord client is offline
|
|
|
|
|
|
class OP25StatusValues(str, Enum):
|
|
LISTENING = "listening" # OP25 is online and listening
|
|
ONLINE = "online" # OP25 is online
|
|
OFFLINE = "offline" # OP25 is offline
|
|
|
|
|
|
client_status = {
|
|
"op25_status": OP25StatusValues.OFFLINE,
|
|
"discord_status": DiscordStatusValues.OFFLINE
|
|
}
|
|
|
|
# --- Define decorator creation function ---
|
|
def command(func):
|
|
"""Decorator to register a function as a command handler."""
|
|
command_handlers[func.__name__] = func
|
|
return func
|
|
|
|
# --- Define Client-Side Command Handlers (The "API" functions) ---
|
|
# Join server
|
|
@command
|
|
async def join_server(websocket, system_id, guild_id, channel_id):
|
|
# Takes guild ID, channel ID, and optionally system_id. If system ID is not included then it will skip OP25 logic
|
|
bot_status = await drb_api.get_bot_status()
|
|
print("Bot status:", bot_status)
|
|
# Check if the bot is running
|
|
if 'bot_running' not in bot_status or not bot_status['bot_running']:
|
|
# Get a token
|
|
bot_token = await srv_api.request_token()
|
|
print("Bot token:", bot_token)
|
|
if not bot_token or "token" not in bot_token or not bot_token['token']:
|
|
raise Exception("No bot token received") # TODO - Handle this better
|
|
# Run the bot if not
|
|
await drb_api.start_bot(bot_token['token'])
|
|
# Check if the bot is connected to the guild, if not join
|
|
if 'connected_guilds' not in bot_status or int(guild_id) not in bot_status['connected_guilds']:
|
|
# Join the server
|
|
await drb_api.join_voice_channel(guild_id, channel_id)
|
|
# Update status
|
|
client_status['discord_status'] = DiscordStatusValues.INVOICE
|
|
|
|
print("Join server completed")
|
|
|
|
# If there is no system ID, skip the OP25 starting / setting logic
|
|
if not system_id:
|
|
return
|
|
|
|
op25_status = await drb_api.get_op25_status()
|
|
print("OP25 status:", op25_status)
|
|
|
|
# Check if OP25 is stopped, if so set the selected system, otherwise do nothing
|
|
if 'status' not in op25_status or op25_status['status'] == "stopped":
|
|
sys_details = await srv_api.get_system_details(system_id)
|
|
print("System details:", sys_details)
|
|
if not sys_details:
|
|
# TODO - handle not having channel details
|
|
return
|
|
|
|
# Generate the config for the channel requested
|
|
tags_list = [TalkgroupTag(**tag_dict) for tag_dict in sys_details.get('tags', []) if tag_dict] if sys_details.get('tags') is not None else None
|
|
|
|
sys_config = ConfigGenerator(
|
|
type=sys_details.get('type'),
|
|
systemName=sys_details['name'],
|
|
channels=sys_details.get('frequencies'),
|
|
tags=tags_list,
|
|
whitelist=sys_details.get('tag_whitelist')
|
|
)
|
|
|
|
# Set the OP25 config
|
|
await drb_api.generate_op25_config(sys_config)
|
|
|
|
# Start OP25
|
|
await drb_api.start_op25()
|
|
|
|
client_status['op25_status'] = OP25StatusValues.LISTENING
|
|
|
|
print("OP25 Startup Complete")
|
|
|
|
# Leave server
|
|
@command
|
|
async def leave_server(websocket, guild_id):
|
|
# Takes guild ID
|
|
bot_status = await drb_api.get_bot_status()
|
|
print("Bot status:", bot_status)
|
|
|
|
# Check if the bot is running
|
|
if 'bot_running' not in bot_status or not bot_status['bot_running']:
|
|
# If not, do nothing
|
|
return
|
|
|
|
# Check if the bot is in the guild
|
|
if not "connected_guilds" in bot_status or int(guild_id) not in bot_status['connected_guilds']:
|
|
return
|
|
|
|
# Leave the server specified
|
|
await drb_api.leave_voice_channel(guild_id)
|
|
|
|
# Update status
|
|
client_status['discord_status'] = DiscordStatusValues.ONLINE
|
|
|
|
print("Leave server completed")
|
|
|
|
# Get the client status
|
|
@command
|
|
async def get_status(websocket, request_id):
|
|
# Get the OP25 Status
|
|
op25_status = await drb_api.get_op25_status()
|
|
if 'status' not in op25_status or op25_status['status'] == "stopped":
|
|
client_status['op25_status'] = OP25StatusValues.OFFLINE
|
|
else:
|
|
client_status['op25_status'] = OP25StatusValues.LISTENING
|
|
|
|
# Get the discord Status
|
|
discord_status = await drb_api.get_bot_status()
|
|
if 'bot_running' not in discord_status or not discord_status['bot_running']:
|
|
client_status['discord_status'] = DiscordStatusValues.OFFLINE
|
|
elif discord_status['bot_running'] and ('connected_guilds' in discord_status and len(discord_status['connected_guilds']) > 0):
|
|
client_status['discord_status'] = DiscordStatusValues.INVOICE
|
|
else:
|
|
client_status['discord_status'] = DiscordStatusValues.ONLINE
|
|
|
|
# Return the status object
|
|
response_payload = {"status": client_status}
|
|
|
|
# Corrected line: Convert the dictionary to a JSON string before sending
|
|
await websocket.send(json.dumps({"type":"response", "request_id": request_id, "payload": response_payload}))
|
|
|
|
# Start OP25
|
|
@command
|
|
async def op25_start(websocket):
|
|
await drb_api.start_op25()
|
|
client_status['op25_status'] = OP25StatusValues.LISTENING
|
|
|
|
# Stop OP25
|
|
@command
|
|
async def op25_stop(websocket):
|
|
await drb_api.stop_op25()
|
|
client_status['op25_status'] = OP25StatusValues.OFFLINE
|
|
|
|
# Set OP25 Config
|
|
@command
|
|
async def op25_set(websocket, system_id):
|
|
system_config = await srv_api.get_system_details(system_id)
|
|
|
|
temp_config = ConfigGenerator(
|
|
type=system_config['type'],
|
|
systemName=system_config['name'],
|
|
channels=system_config['frequencies'],
|
|
tags=system_config['tags'],
|
|
whitelist=system_config['whitelist']
|
|
)
|
|
|
|
await drb_api.generate_op25_config(temp_config)
|
|
|
|
# Example command
|
|
@command
|
|
async def run_task(websocket, task_id, duration_seconds):
|
|
"""Example command: Simulates running a task."""
|
|
print(f"\n--- Server Command: run_task ---")
|
|
print(f"Starting task {task_id} for {duration_seconds} seconds...")
|
|
await asyncio.sleep(duration_seconds)
|
|
print(f"Task {task_id} finished.")
|
|
print("------------------------------")
|
|
|
|
# ------------------------------------------------------------------
|
|
async def receive_commands(websocket):
|
|
"""Listens for and processes command messages from the server."""
|
|
async for message in websocket:
|
|
try:
|
|
data = json.loads(message)
|
|
if data.get("type") == "command":
|
|
command_name = data.get("name")
|
|
args = data.get("args", [])
|
|
|
|
# Check if there is a req ID, if so add it to the args
|
|
req_id = data.get("request_id", None)
|
|
if req_id:
|
|
args.append(req_id)
|
|
|
|
if command_name in command_handlers:
|
|
print(f"Executing command: {command_name} with args {args}")
|
|
# Execute the registered async function
|
|
await command_handlers[command_name](websocket, *args)
|
|
else:
|
|
print(f"Received unknown command: {command_name}")
|
|
elif data.get("type") == "handshake_ack":
|
|
# Set the session token
|
|
app_conf.set(data.get("access_token"))
|
|
|
|
print(f"Server acknowledged handshake.")
|
|
else:
|
|
print(f"Received unknown message type: {data.get('type')}")
|
|
|
|
except json.JSONDecodeError:
|
|
print(f"Received invalid JSON: {message}")
|
|
except Exception as e:
|
|
print(f"Error processing message: {e}")
|
|
|
|
|
|
async def main_client():
|
|
"""Connects to the server and handles communication with a robust retry system."""
|
|
retry_delay = 5 # seconds
|
|
while True:
|
|
print(f"Client {CLIENT_ID} attempting to connect to {SERVER_WS_URI}...")
|
|
try:
|
|
async with websockets.connect(SERVER_WS_URI) as websocket:
|
|
print("Connection established.")
|
|
|
|
# Handshake: Send client ID immediately after connecting
|
|
handshake_message = json.dumps({"type": "handshake", "id": CLIENT_ID, "nickname": NICKNAME})
|
|
await websocket.send(handshake_message)
|
|
print(f"Sent handshake with ID: {CLIENT_ID}")
|
|
|
|
# Start receiving commands and keep the connection alive
|
|
await receive_commands(websocket)
|
|
|
|
except (ConnectionRefusedError, websockets.exceptions.ConnectionClosedOK, websockets.exceptions.ConnectionClosedError) as e:
|
|
print(f"Connection error: {e}. Retrying in {retry_delay} seconds...")
|
|
await asyncio.sleep(retry_delay)
|
|
except Exception as e:
|
|
print(f"An unexpected error occurred: {e}. Retrying in {retry_delay} seconds...")
|
|
await asyncio.sleep(retry_delay)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main_client()) |