From 52405e9bd1946d17739912bc926ab08b4743c83d Mon Sep 17 00:00:00 2001 From: Logan Cusano Date: Sun, 27 Apr 2025 00:30:23 -0400 Subject: [PATCH] Init push --- .gitignore | 2 + Dockerfile | 22 ++++ Makefile | 15 +++ client.py | 171 +++++++++++++++++++++++++++++ drb_cdb_api.py | 275 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 + server_api.py | 190 ++++++++++++++++++++++++++++++++ 7 files changed, 677 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 Makefile create mode 100644 client.py create mode 100644 drb_cdb_api.py create mode 100644 requirements.txt create mode 100644 server_api.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c94b16f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.venv +*__pycache__ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b41b5c4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +# Use an official Python runtime as a parent image +FROM python:3.13-slim + +# Set the working directory in the container +WORKDIR /app + +# Copy the requirements file into the container +COPY requirements.txt . + +# Install any needed packages specified in requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the client code into the container +COPY client.py . + +# Define environment variable for the server host +# This will be set by the Makefile when running the container +ENV SERVER_HOST=localhost + +# Run client.py when the container launches +# We use a list form of CMD to properly handle signals +CMD ["python", "client.py"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f27a66c --- /dev/null +++ b/Makefile @@ -0,0 +1,15 @@ +# Define variables for client image name +CLIENT_IMAGE = websocket-client-app + +# Default target: build the server image +all: build + +# Target to build the server Docker image +build: + docker build -t $(CLIENT_IMAGE) . + +# Target to run the server container using the host network +run: build + docker run -it --rm \ + --network=host \ + $(CLIENT_IMAGE) diff --git a/client.py b/client.py new file mode 100644 index 0000000..55eca7b --- /dev/null +++ b/client.py @@ -0,0 +1,171 @@ +import asyncio +import websockets +import json +import uuid # To generate a unique client ID +from drb_cdb_api import DRBCDBAPI, ConfigGenerator +from server_api import RadioAPIClient +from enum import Enum + +# --- Client Configuration --- +SERVER_WS_URI = "ws://localhost:8765" +SERVER_API_URL = "http://localhost:5000" +CLIENT_API_URL = "http://localhost:8001" +# Generate or define a unique ID for THIS client instance +# In a real app, this might come from config or a login process +CLIENT_ID = f"client-{uuid.uuid4().hex[:8]}" # TODO - Implement persistent ID +# ---------------------------- + +# Dictionary mapping command names (strings) to local client functions +command_handlers = {} + +# Init DRB API handler +drb_api = DRBCDBAPI(CLIENT_API_URL) +srv_api = RadioAPIClient(SERVER_API_URL) + +# --- Define the client status object --- +class StatusValues(Enum): + ONLINE = "online" # The client is online + LISTENING = "listening" # The client bot is online and listening to a system + +client_status = StatusValues.ONLINE + +# --- Define decorator creation function --- +def command(func): + """Decorator to register a function as a command handler.""" + command_handlers[func.__name__] = func + return func + +# --- Define Client-Side Command Handlers (The "API" functions) --- + +# Join server +@command +async def join_server(system_id, guild_id, channel_id): +# Takes system ID, guild ID, channel ID + bot_status = drb_api.get_bot_status() + # Check if the bot is running + if 'bot_running' not in bot_status or not bot_status['bot_running']: + # Run the bot if not + drb_api.start_bot() + # Update status + client_status = StatusValues.LISTENING + op25_status = drb_api.get_op25_status() + + # Check if OP25 is stopped, if so set the selected channel, otherwise + if op25_status == "stopped": + chn_details = srv_api.get_channel_details(channel_id) + if not chn_details: + # TODO - handle not having channel details + pass + + # Generate the config for the channel requested + chn_config = ConfigGenerator( + type=chn_details['decode_mode'], + systemName=chn_details['name'], + channels=chn_details['frequency_list_khz'], + tags=chn_details['tags'], + whitelist=chn_details['tag_whitelist']) + + # Set the OP25 config + drb_api.generate_op25_config(chn_config) + + # Start OP25 + drb_api.start_op25() + +# Leave server +@command +async def leave_server(guild_id): +# Takes guild ID + bot_status = drb_api.get_bot_status() + + # Check if the bot is running + if 'bot_running' not in bot_status or not bot_status['bot_running']: + # If not, do nothing + return + + # Check if the bot is in the guild + if not "connected_guilds" in bot_status or guild_id not in bot_status['connected_guilds']: + return + + # Leave the server specified + drb_api.leave_voice_channel(guild_id) + + # Update status + client_status = StatusValues.ONLINE + +@command +async def set_status(status_text): + """Example command: Sets or displays a status.""" + print(f"\n--- Server Command: set_status ---") + print(f"Status updated to: {status_text}") + print("----------------------------------") + +@command +async def run_task(task_id, duration_seconds): + """Example command: Simulates running a task.""" + print(f"\n--- Server Command: run_task ---") + print(f"Starting task {task_id} for {duration_seconds} seconds...") + await asyncio.sleep(duration_seconds) + print(f"Task {task_id} finished.") + print("------------------------------") + +# Add more command handlers as needed... + +# ------------------------------------------------------------------ + + +async def receive_commands(websocket): + """Listens for and processes command messages from the server.""" + async for message in websocket: + try: + data = json.loads(message) + if data.get("type") == "command": + command_name = data.get("name") + args = data.get("args", []) + + if command_name in command_handlers: + print(f"Executing command: {command_name} with args {args}") + # Execute the registered async function + await command_handlers[command_name](*args) + else: + print(f"Received unknown command: {command_name}") + elif data.get("type") == "handshake_ack": + print(f"Server acknowledged handshake.") + else: + print(f"Received unknown message type: {data.get('type')}") + + except json.JSONDecodeError: + print(f"Received invalid JSON: {message}") + except Exception as e: + print(f"Error processing message: {e}") + +async def main_client(): + """Connects to the server and handles communication.""" + print(f"Client {CLIENT_ID} connecting to {SERVER_WS_URI}...") + try: + async with websockets.connect(SERVER_WS_URI) as websocket: + print("Connection established.") + + # Handshake: Send client ID immediately after connecting + handshake_message = json.dumps({"type": "handshake", "id": CLIENT_ID}) + await websocket.send(handshake_message) + print(f"Sent handshake with ID: {CLIENT_ID}") + + # Start receiving commands and keep the connection alive + await receive_commands(websocket) + + except ConnectionRefusedError: + print(f"Connection refused. Is the server running at {SERVER_WS_URI}?") + except websockets.exceptions.ConnectionClosedOK: + print("Connection closed gracefully by server.") + except websockets.exceptions.ConnectionClosedError as e: + print(f"Connection closed unexpectedly: {e}") + except Exception as e: + print(f"An error occurred: {e}") + + print(f"Client {CLIENT_ID} stopped.") + + +if __name__ == "__main__": + # Note: In a real application, you'd want a more robust way + # to manage the event loop and handle potential reconnections. + asyncio.run(main_client()) \ No newline at end of file diff --git a/drb_cdb_api.py b/drb_cdb_api.py new file mode 100644 index 0000000..a1a67f4 --- /dev/null +++ b/drb_cdb_api.py @@ -0,0 +1,275 @@ +import httpx +import json +import asyncio # Import asyncio for running the example usage +from enum import Enum +from typing import Dict, Any, List, Optional + +class DecodeMode(str, Enum): + P25 = "P25" + DMR = "DMR" + ANALOG = "NBFM" # Note: The API code uses "NBFM" for analog + +class TalkgroupTag: + """Represents a talkgroup tag.""" + def __init__(self, talkgroup: str, tagDec: int): + self.talkgroup = talkgroup + self.tagDec = tagDec + + # Add a method to convert to a dictionary, useful for sending as JSON + def to_dict(self) -> Dict[str, Any]: + return {"talkgroup": self.talkgroup, "tagDec": self.tagDec} + +class ConfigGenerator: + """Represents the configuration data structure for the API.""" + def __init__( + self, + type: DecodeMode, + systemName: str, + channels: List[str], + tags: Optional[List[TalkgroupTag]] = None, + whitelist: Optional[List[int]] = None + ): + self.type = type + self.systemName = systemName + self.channels = channels + self.tags = tags + self.whitelist = whitelist + + # Add a method to convert to a dictionary, useful for sending as JSON + def to_dict(self) -> Dict[str, Any]: + data = { + "type": self.type.value, # Use .value for Enum + "systemName": self.systemName, + "channels": self.channels, + } + if self.tags is not None: + # Convert list of TalkgroupTag objects to list of dictionaries + data["tags"] = [tag.to_dict() for tag in self.tags] + if self.whitelist is not None: + data["whitelist"] = self.whitelist + return data + +class DRBCDBAPI: + """ + An asynchronous Python wrapper for interacting with the FastAPI application. + Uses httpx for asynchronous HTTP requests. + """ + def __init__(self, base_url: str): + """ + Initializes the wrapper with the base URL of the FastAPI application. + + Args: + base_url: The base URL of the FastAPI application (e.g., "http://localhost:8000"). + """ + self.base_url = base_url + # Use an AsyncClient for making asynchronous requests + self._client = httpx.AsyncClient() + + async def __aenter__(self): + """Allows using the client with async with.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Ensures the client is closed when exiting async with.""" + await self.close() + + async def close(self): + """Closes the underlying asynchronous HTTP client.""" + await self._client.close() + + async def _post(self, endpoint: str, data: dict = None): + """ + Asynchronous helper method for making POST requests. + + Args: + endpoint: The API endpoint (e.g., "/op25/start"). + data: The data to send in the request body (as a dictionary). + + Returns: + The JSON response from the API. + + Raises: + httpx.RequestError: If the request fails. + httpx.HTTPStatusError: If the API returns an error status code (4xx or 5xx). + """ + url = f"{self.base_url}{endpoint}" + try: + # Use await with the asynchronous httpx client + response = await self._client.post(url, json=data) + response.raise_for_status() # Raise HTTPStatusError for bad responses (4xx or 5xx) + return response.json() + except httpx.HTTPStatusError as e: + print(f"HTTP error occurred: {e}") + print(f"Response body: {e.response.text}") # Access response text from the exception + raise + except httpx.RequestError as e: + print(f"Request failed: {e}") + raise + + async def _get(self, endpoint: str): + """ + Asynchronous helper method for making GET requests. + + Args: + endpoint: The API endpoint (e.g., "/op25/status"). + + Returns: + The JSON response from the API. + + Raises: + httpx.RequestError: If the request fails. + httpx.HTTPStatusError: If the API returns an error status code (4xx or 5xx). + """ + url = f"{self.base_url}{endpoint}" + try: + # Use await with the asynchronous httpx client + response = await self._client.get(url) + response.raise_for_status() # Raise HTTPStatusError for bad responses (4xx or 5xx) + return response.json() + except httpx.HTTPStatusError as e: + print(f"HTTP error occurred: {e}") + print(f"Response body: {e.response.text}") # Access response text from the exception + raise + except httpx.RequestError as e: + print(f"Request failed: {e}") + raise + + # --- OP25 Endpoints --- + + async def start_op25(self): + """Starts the OP25 process asynchronously.""" + return await self._post("/op25/start") + + async def stop_op25(self): + """Stops the OP25 process asynchronously.""" + return await self._post("/op25/stop") + + async def get_op25_status(self): + """Gets the status of the OP25 process asynchronously.""" + return await self._get("/op25/status") + + async def generate_op25_config(self, config_data: ConfigGenerator): + """ + Generates the OP25 configuration file asynchronously. + + Args: + config_data: A ConfigGenerator object representing the configuration data. + """ + # Convert the ConfigGenerator object to a dictionary before sending as JSON + return await self._post("/op25/generate-config", data=config_data.to_dict()) + + # --- Pulse Audio Endpoints --- + + async def get_pulse_status(self): + """Gets the status of the Pulse Audio process asynchronously.""" + return await self._get("/pulse/status") + + # --- Bot Endpoints --- + + async def start_bot(self, token: str): + """ + Starts the Discord bot asynchronously. + + Args: + token: The Discord bot token. + """ + return await self._post("/bot/start_bot", data={"token": token}) + + async def stop_bot(self): + """Stops the Discord bot asynchronously.""" + return await self._post("/bot/stop_bot") + + async def join_voice_channel(self, guild_id: int, channel_id: int): + """ + Joins a voice channel asynchronously. + + Args: + guild_id: The ID of the guild. + channel_id: The ID of the voice channel. + """ + return await self._post("/bot/join_voice", data={"guild_id": guild_id, "channel_id": channel_id}) + + async def leave_voice_channel(self, guild_id: int): + """ + Leaves a voice channel asynchronously. + + Args: + guild_id: The ID of the guild to leave the voice channel from. + """ + return await self._post("/bot/leave_voice", data={"guild_id": guild_id}) + + async def get_bot_status(self): + """Gets the status of the Discord bot asynchronously.""" + return await self._get("/bot/status") + +# Example Usage (assuming your FastAPI app is running on http://localhost:8000) +async def example_usage(): + """Demonstrates asynchronous API interaction using httpx.""" + # Use async with to ensure the client is properly closed + async with DRBCDBAPI("http://localhost:8000") as api: + try: + # Example OP25 calls - remember to await them + print("Starting OP25...") + # Note: This will likely fail if the endpoint expects no body or a different structure + # Based on the original requests code, it sent an empty body for start/stop + # If your FastAPI requires a specific model, you'll need to pass that data=... + # For now, assuming it accepts an empty body or ignores it for these endpoints. + print(await api.start_op25()) + print("OP25 Status:", await api.get_op25_status()) + + # Example of generating a P25 config (replace with your actual data) + p25_config_data = { + "type": "p25", + "systemName": "MyP25System", + "channels": ["851.0125", "851.0250"], + "tags": [{"talkgroup": 12345, "tagDec": "Police Dispatch"}], + "whitelist": [12345, 67890] + } + print("Generating OP25 P25 config...") + print(await api.generate_op25_config(p25_config_data)) + + # Example of generating an Analog config (replace with your actual data) + analog_config_data = { + "type": "nbfm", # Note: The API code uses "nbfm" for analog + "config": { + "systemName": "MyAnalogChannel", + "frequency": 453.250, + "nbfmSquelch": -120 + } + } + print("Generating OP25 Analog config...") + print(await api.generate_op25_config(analog_config_data)) + + + # Example Pulse Audio call + print("Pulse Audio Status:", await api.get_pulse_status()) + + # Example Bot calls (replace with your actual bot token, guild_id, channel_id) + # print("Starting bot...") + # print(await api.start_bot("YOUR_BOT_TOKEN")) + # print("Bot Status:", await api.get_bot_status()) + # print("Joining voice channel...") + # print(await api.join_voice_channel(1234567890, 9876543210)) # Replace with actual IDs + # print("Leaving voice channel...") + # print(await api.leave_voice_channel(1234567890)) # Replace with actual ID + # print("Stopping bot...") + # print(await api.stop_bot()) + + + # print("Stopping OP25...") + # print(await api.stop_op25()) + # print("OP25 Status:", await api.get_op25_status()) + + + except httpx.RequestError as e: + print(f"An httpx request error occurred during API interaction: {e}") + except httpx.HTTPStatusError as e: + print(f"An httpx HTTP status error occurred during API interaction: {e}") + # You can access the response body here via e.response.text or e.response.json() + print(f"Response body: {e.response.text}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + +if __name__ == "__main__": + # To run this example, you need to run it within an asyncio event loop + asyncio.run(example_usage()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4d37f77 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +websockets +httpx \ No newline at end of file diff --git a/server_api.py b/server_api.py new file mode 100644 index 0000000..6aa687b --- /dev/null +++ b/server_api.py @@ -0,0 +1,190 @@ +import httpx +import json + +class RadioAPIClient: + """ + A client wrapper for interacting with the Radio App Server API. + Uses httpx for asynchronous HTTP requests. + """ + def __init__(self, base_url="http://localhost:5000"): + """ + Initializes the API client. + + Args: + base_url (str): The base URL of the server's API (default is http://localhost:5000). + """ + self.base_url = base_url + # Use an AsyncClient for making asynchronous requests + self._client = httpx.AsyncClient() + + async def __aenter__(self): + """Allows using the client with async with.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Ensures the client is closed when exiting async with.""" + await self.close() + + async def close(self): + """Closes the underlying HTTP client.""" + await self._client.close() + + async def _request(self, method, endpoint, **kwargs): + """ + Helper method to make an asynchronous HTTP request. + + Args: + method (str): The HTTP method (e.g., 'GET', 'POST'). + endpoint (str): The API endpoint path (e.g., '/channels'). + **kwargs: Additional arguments for httpx.AsyncClient.request. + + Returns: + dict: The JSON response from the API. + + Raises: + httpx.HTTPStatusError: If the request returns a non-2xx status code. + httpx.RequestError: For other request-related errors. + """ + url = f"{self.base_url}{endpoint}" + try: + response = await self._client.request(method, url, **kwargs) + response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) + return response.json() + except httpx.HTTPStatusError as e: + print(f"HTTP error occurred: {e}") + # You might want to return the error response body or raise the exception + raise + except httpx.RequestError as e: + print(f"An error occurred while requesting {e.request.url!r}: {e}") + raise + + async def get_channels(self): + """ + Retrieves a summary list of all available radio channels. + + Returns: + list: A list of channel summaries (e.g., [{"id": "channel_1", "name": "..."}]). + """ + print(f"Fetching channels from {self.base_url}/channels") + return await self._request("GET", "/channels") + + async def get_channel_details(self, channel_id: str): + """ + Retrieves detailed information for a specific channel. + + Args: + channel_id (str): The unique ID of the channel. + + Returns: + dict: The channel details. + + Raises: + httpx.HTTPStatusError: If the channel is not found (404). + """ + print(f"Fetching details for channel: {channel_id} from {self.base_url}/channels/{channel_id}") + return await self._request("GET", f"/channels/{channel_id}") + + async def list_clients(self): + """ + Retrieves a list of currently connected client IDs. + + Returns: + list: A list of client IDs (strings). + """ + print(f"Fetching connected clients from {self.base_url}/clients") + return await self._request("GET", "/clients") + + async def send_command(self, client_id: str, command_name: str, args: list = None): + """ + Sends a command to a specific client via the server's API. + + Args: + client_id (str): The ID of the target client. + command_name (str): The name of the command to execute on the client. + args (list, optional): A list of arguments for the command. Defaults to None. + + Returns: + dict: The API response indicating the command status. + """ + if args is None: + args = [] + print(f"Sending command '{command_name}' to client {client_id} with args {args} via API") + return await self._request( + "POST", + f"/command/{client_id}/{command_name}", + json={"args": args} # API expects args in a JSON body + ) + +# --- Example Usage --- +async def example_api_usage(): + """Demonstrates how to use the RadioAPIClient.""" + # You can specify a different base_url if your server is elsewhere + async with RadioAPIClient("http://localhost:5000") as api_client: + print("\n--- Getting Channels ---") + try: + channels_summary = await api_client.get_channels() + print("Channels Summary:", channels_summary) + except Exception as e: + print(f"Error getting channels: {e}") + + print("\n--- Getting Channel Details (channel_1) ---") + try: + channel_details = await api_client.get_channel_details("channel_1") + print("Channel 1 Details:", channel_details) + except Exception as e: + print(f"Error getting channel details: {e}") + + print("\n--- Getting Channel Details (non-existent) ---") + try: + non_existent_channel = await api_client.get_channel_details("non_existent_channel") + print("Non-existent Channel Details:", non_existent_channel) # This should not print + except httpx.HTTPStatusError as e: + print(f"Caught expected error for non-existent channel: {e.response.status_code} - {e.response.json()}") + except Exception as e: + print(f"Error getting non-existent channel details: {e}") + + + print("\n--- Listing Connected Clients ---") + try: + connected_clients = await api_client.list_clients() + print("Connected Clients:", connected_clients) + + # --- Sending a Command (Requires a client to be running with a matching ID) --- + if connected_clients: + target_client_id = connected_clients[0] # Send to the first connected client + print(f"\n--- Sending 'print_message' command to {target_client_id} ---") + try: + command_response = await api_client.send_command( + target_client_id, + "print_message", + ["Hello from the API wrapper!"] + ) + print("Command Response:", command_response) + except Exception as e: + print(f"Error sending command: {e}") + + print(f"\n--- Sending 'run_task' command to {target_client_id} ---") + try: + command_response = await api_client.send_command( + target_client_id, + "run_task", + ["api_task_1", 3] # task_id, duration_seconds + ) + print("Command Response:", command_response) + except Exception as e: + print(f"Error sending command: {e}") + + else: + print("No clients connected to send commands to.") + + except Exception as e: + print(f"Error listing clients or sending commands: {e}") + + +if __name__ == "__main__": + # To run this example: + # 1. Ensure the server (server.py) is running. + # 2. Ensure at least one client (client.py) is running. + # 3. Run this script: python api_client.py + asyncio.run(example_api_usage()) +