Files
drb-core-client/app/client.py
2025-05-11 20:36:31 -04:00

186 lines
6.8 KiB
Python

import asyncio
import websockets
import json
import uuid
import os
from drb_cdb_api import DRBCDBAPI
from drb_cdb_types import ConfigGenerator
from server_api import RadioAPIClient
from enum import Enum
from config import Config
app_conf = Config()
# --- Client Configuration ---
SERVER_WS_URI = os.getenv("SERVER_WS_URI", "ws://localhost:8765")
SERVER_API_URL = os.getenv("SERVER_API_URL", "http://localhost:5000")
CLIENT_API_URL = os.getenv("CLIENT_API_URL", "http://localhost:8001")
# Get/set the ID of this node
if not app_conf.get("client_id"):
app_conf.set("client_id", f"client-{uuid.uuid4().hex[:8]}")
CLIENT_ID = app_conf.client_id
# ----------------------------
# Dictionary mapping command names (strings) to local client functions
command_handlers = {}
# Init DRB API handler
drb_api = DRBCDBAPI(CLIENT_API_URL)
srv_api = RadioAPIClient(SERVER_API_URL)
# --- Define the client status object ---
class StatusValues(Enum):
ONLINE = "online" # The client is online
LISTENING = "listening" # The client bot is online and listening to a system
client_status = StatusValues.ONLINE
# --- Define decorator creation function ---
def command(func):
"""Decorator to register a function as a command handler."""
command_handlers[func.__name__] = func
return func
# --- Define Client-Side Command Handlers (The "API" functions) ---
# Join server
@command
async def join_server(system_id, guild_id, channel_id):
# Takes system ID, guild ID, channel ID
bot_status = await drb_api.get_bot_status()
print("Bot status:", bot_status)
# Check if the bot is running
if 'bot_running' not in bot_status or not bot_status['bot_running']:
# Get a token
bot_token = await srv_api.request_token()
print("Bot token:", bot_token)
if not bot_token or "token" not in bot_token or not bot_token['token']:
raise Exception("No bot token received") # TODO - Handle this better
# Run the bot if not
await drb_api.start_bot(bot_token['token'])
# Check if the bot is connected to the guild, if not join
if 'connected_guilds' not in bot_status or guild_id not in bot_status['connected_guilds']:
# Join the server
await drb_api.join_voice_channel(guild_id, channel_id)
# Update status
client_status = StatusValues.LISTENING
op25_status = await drb_api.get_op25_status()
print("OP25 status:", op25_status)
# Check if OP25 is stopped, if so set the selected system, otherwise do nothing
if 'status' not in op25_status or op25_status['status'] == "stopped":
sys_details = await srv_api.get_system_details(system_id)
print("System details:", sys_details)
if not sys_details:
# TODO - handle not having channel details
pass
# Generate the config for the channel requested
tags_list = [TalkgroupTag(**tag_dict) for tag_dict in sys_details.get('tags', []) if tag_dict] if sys_details.get('tags') is not None else None
sys_config = ConfigGenerator(
type=sys_details.get('decode_mode'),
systemName=sys_details['name'],
channels=sys_details.get('frequency_list_khz'), # Assuming 'channels' is the correct field name
tags=tags_list,
whitelist=sys_details.get('tag_whitelist') # Use .get for optional fields
)
# Set the OP25 config
await drb_api.generate_op25_config(sys_config)
# Start OP25
await drb_api.start_op25()
print("Join server completed")
# Leave server
@command
async def leave_server(guild_id):
# Takes guild ID
bot_status = await drb_api.get_bot_status()
print("Bot status:", bot_status)
# Check if the bot is running
if 'bot_running' not in bot_status or not bot_status['bot_running']:
# If not, do nothing
return
# Check if the bot is in the guild
if not "connected_guilds" in bot_status or guild_id not in bot_status['connected_guilds']:
return
# Leave the server specified
await drb_api.leave_voice_channel(guild_id)
# Update status
client_status = StatusValues.ONLINE
print("Leave server completed")
@command
async def run_task(task_id, duration_seconds):
"""Example command: Simulates running a task."""
print(f"\n--- Server Command: run_task ---")
print(f"Starting task {task_id} for {duration_seconds} seconds...")
await asyncio.sleep(duration_seconds)
print(f"Task {task_id} finished.")
print("------------------------------")
# ------------------------------------------------------------------
async def receive_commands(websocket):
"""Listens for and processes command messages from the server."""
async for message in websocket:
try:
data = json.loads(message)
if data.get("type") == "command":
command_name = data.get("name")
args = data.get("args", [])
if command_name in command_handlers:
print(f"Executing command: {command_name} with args {args}")
# Execute the registered async function
await command_handlers[command_name](*args)
else:
print(f"Received unknown command: {command_name}")
elif data.get("type") == "handshake_ack":
print(f"Server acknowledged handshake.")
else:
print(f"Received unknown message type: {data.get('type')}")
except json.JSONDecodeError:
print(f"Received invalid JSON: {message}")
except Exception as e:
print(f"Error processing message: {e}")
async def main_client():
"""Connects to the server and handles communication."""
print(f"Client {CLIENT_ID} connecting to {SERVER_WS_URI}...")
try:
async with websockets.connect(SERVER_WS_URI) as websocket:
print("Connection established.")
# Handshake: Send client ID immediately after connecting
handshake_message = json.dumps({"type": "handshake", "id": CLIENT_ID})
await websocket.send(handshake_message)
print(f"Sent handshake with ID: {CLIENT_ID}")
# Start receiving commands and keep the connection alive
await receive_commands(websocket)
except ConnectionRefusedError:
print(f"Connection refused. Is the server running at {SERVER_WS_URI}?")
except websockets.exceptions.ConnectionClosedOK:
print("Connection closed gracefully by server.")
except websockets.exceptions.ConnectionClosedError as e:
print(f"Connection closed unexpectedly: {e}")
except Exception as e:
print(f"An error occurred: {e}")
print(f"Client {CLIENT_ID} stopped.")
if __name__ == "__main__":
# Note: In a real application, you'd want a more robust way
# to manage the event loop and handle potential reconnections.
asyncio.run(main_client())