import httpx import json import asyncio # Import asyncio for running the example usage from enum import Enum from typing import Dict, Any, List, Optional class DecodeMode(str, Enum): P25 = "P25" DMR = "DMR" ANALOG = "NBFM" # Note: The API code uses "NBFM" for analog class TalkgroupTag: """Represents a talkgroup tag.""" def __init__(self, talkgroup: str, tagDec: int): self.talkgroup = talkgroup self.tagDec = tagDec # Add a method to convert to a dictionary, useful for sending as JSON def to_dict(self) -> Dict[str, Any]: return {"talkgroup": self.talkgroup, "tagDec": self.tagDec} class ConfigGenerator: """Represents the configuration data structure for the API.""" def __init__( self, type: DecodeMode, systemName: str, channels: List[str], tags: Optional[List[TalkgroupTag]] = None, whitelist: Optional[List[int]] = None ): self.type = type self.systemName = systemName self.channels = channels self.tags = tags self.whitelist = whitelist # Add a method to convert to a dictionary, useful for sending as JSON def to_dict(self) -> Dict[str, Any]: data = { "type": self.type.value, # Use .value for Enum "systemName": self.systemName, "channels": self.channels, } if self.tags is not None: # Convert list of TalkgroupTag objects to list of dictionaries data["tags"] = [tag.to_dict() for tag in self.tags] if self.whitelist is not None: data["whitelist"] = self.whitelist return data class DRBCDBAPI: """ An asynchronous Python wrapper for interacting with the FastAPI application. Uses httpx for asynchronous HTTP requests. """ def __init__(self, base_url: str): """ Initializes the wrapper with the base URL of the FastAPI application. Args: base_url: The base URL of the FastAPI application (e.g., "http://localhost:8000"). """ self.base_url = base_url # Use an AsyncClient for making asynchronous requests self._client = httpx.AsyncClient() async def __aenter__(self): """Allows using the client with async with.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Ensures the client is closed when exiting async with.""" await self.close() async def close(self): """Closes the underlying asynchronous HTTP client.""" await self._client.close() async def _post(self, endpoint: str, data: dict = None): """ Asynchronous helper method for making POST requests. Args: endpoint: The API endpoint (e.g., "/op25/start"). data: The data to send in the request body (as a dictionary). Returns: The JSON response from the API. Raises: httpx.RequestError: If the request fails. httpx.HTTPStatusError: If the API returns an error status code (4xx or 5xx). """ url = f"{self.base_url}{endpoint}" try: # Use await with the asynchronous httpx client response = await self._client.post(url, json=data) response.raise_for_status() # Raise HTTPStatusError for bad responses (4xx or 5xx) return response.json() except httpx.HTTPStatusError as e: print(f"HTTP error occurred: {e}") print(f"Response body: {e.response.text}") # Access response text from the exception raise except httpx.RequestError as e: print(f"Request failed: {e}") raise async def _get(self, endpoint: str): """ Asynchronous helper method for making GET requests. Args: endpoint: The API endpoint (e.g., "/op25/status"). Returns: The JSON response from the API. Raises: httpx.RequestError: If the request fails. httpx.HTTPStatusError: If the API returns an error status code (4xx or 5xx). """ url = f"{self.base_url}{endpoint}" try: # Use await with the asynchronous httpx client response = await self._client.get(url) response.raise_for_status() # Raise HTTPStatusError for bad responses (4xx or 5xx) return response.json() except httpx.HTTPStatusError as e: print(f"HTTP error occurred: {e}") print(f"Response body: {e.response.text}") # Access response text from the exception raise except httpx.RequestError as e: print(f"Request failed: {e}") raise # --- OP25 Endpoints --- async def start_op25(self): """Starts the OP25 process asynchronously.""" return await self._post("/op25/start") async def stop_op25(self): """Stops the OP25 process asynchronously.""" return await self._post("/op25/stop") async def get_op25_status(self): """Gets the status of the OP25 process asynchronously.""" return await self._get("/op25/status") async def generate_op25_config(self, config_data: ConfigGenerator): """ Generates the OP25 configuration file asynchronously. Args: config_data: A ConfigGenerator object representing the configuration data. """ # Convert the ConfigGenerator object to a dictionary before sending as JSON return await self._post("/op25/generate-config", data=config_data.to_dict()) # --- Pulse Audio Endpoints --- async def get_pulse_status(self): """Gets the status of the Pulse Audio process asynchronously.""" return await self._get("/pulse/status") # --- Bot Endpoints --- async def start_bot(self, token: str): """ Starts the Discord bot asynchronously. Args: token: The Discord bot token. """ return await self._post("/bot/start_bot", data={"token": token}) async def stop_bot(self): """Stops the Discord bot asynchronously.""" return await self._post("/bot/stop_bot") async def join_voice_channel(self, guild_id: int, channel_id: int): """ Joins a voice channel asynchronously. Args: guild_id: The ID of the guild. channel_id: The ID of the voice channel. """ return await self._post("/bot/join_voice", data={"guild_id": guild_id, "channel_id": channel_id}) async def leave_voice_channel(self, guild_id: int): """ Leaves a voice channel asynchronously. Args: guild_id: The ID of the guild to leave the voice channel from. """ return await self._post("/bot/leave_voice", data={"guild_id": guild_id}) async def get_bot_status(self): """Gets the status of the Discord bot asynchronously.""" return await self._get("/bot/status") # Example Usage (assuming your FastAPI app is running on http://localhost:8000) async def example_usage(): """Demonstrates asynchronous API interaction using httpx.""" # Use async with to ensure the client is properly closed async with DRBCDBAPI("http://localhost:8000") as api: try: # Example OP25 calls - remember to await them print("Starting OP25...") # Note: This will likely fail if the endpoint expects no body or a different structure # Based on the original requests code, it sent an empty body for start/stop # If your FastAPI requires a specific model, you'll need to pass that data=... # For now, assuming it accepts an empty body or ignores it for these endpoints. print(await api.start_op25()) print("OP25 Status:", await api.get_op25_status()) # Example of generating a P25 config (replace with your actual data) p25_config_data = { "type": "p25", "systemName": "MyP25System", "channels": ["851.0125", "851.0250"], "tags": [{"talkgroup": 12345, "tagDec": "Police Dispatch"}], "whitelist": [12345, 67890] } print("Generating OP25 P25 config...") print(await api.generate_op25_config(p25_config_data)) # Example of generating an Analog config (replace with your actual data) analog_config_data = { "type": "nbfm", # Note: The API code uses "nbfm" for analog "config": { "systemName": "MyAnalogChannel", "frequency": 453.250, "nbfmSquelch": -120 } } print("Generating OP25 Analog config...") print(await api.generate_op25_config(analog_config_data)) # Example Pulse Audio call print("Pulse Audio Status:", await api.get_pulse_status()) # Example Bot calls (replace with your actual bot token, guild_id, channel_id) # print("Starting bot...") # print(await api.start_bot("YOUR_BOT_TOKEN")) # print("Bot Status:", await api.get_bot_status()) # print("Joining voice channel...") # print(await api.join_voice_channel(1234567890, 9876543210)) # Replace with actual IDs # print("Leaving voice channel...") # print(await api.leave_voice_channel(1234567890)) # Replace with actual ID # print("Stopping bot...") # print(await api.stop_bot()) # print("Stopping OP25...") # print(await api.stop_op25()) # print("OP25 Status:", await api.get_op25_status()) except httpx.RequestError as e: print(f"An httpx request error occurred during API interaction: {e}") except httpx.HTTPStatusError as e: print(f"An httpx HTTP status error occurred during API interaction: {e}") # You can access the response body here via e.response.text or e.response.json() print(f"Response body: {e.response.text}") except Exception as e: print(f"An unexpected error occurred: {e}") if __name__ == "__main__": # To run this example, you need to run it within an asyncio event loop asyncio.run(example_usage())