Files
drb-core-client/app/drb_cdb_api.py
2025-06-22 23:21:02 -04:00

174 lines
6.8 KiB
Python

import httpx
import json
import asyncio # Import asyncio for running the example usage
from drb_cdb_types import ConfigGenerator
from base_api import BaseAPI
class DRBCDBAPI(BaseAPI):
"""
An asynchronous Python wrapper for interacting with the FastAPI application.
Uses httpx for asynchronous HTTP requests.
"""
def __init__(self, base_url: str):
"""
Initializes the wrapper with the base URL of the FastAPI application.
Args:
base_url: The base URL of the FastAPI application (e.g., "http://localhost:8000").
"""
super().__init__()
self.base_url = base_url
# Use an AsyncClient for making asynchronous requests
self._client = httpx.AsyncClient(timeout=30)
# --- OP25 Endpoints ---
async def start_op25(self):
"""Starts the OP25 process asynchronously."""
print(f"Starting OP25")
return await self._post("/op25/start")
async def stop_op25(self):
"""Stops the OP25 process asynchronously."""
print(f"Stopping OP25")
return await self._post("/op25/stop")
async def get_op25_status(self):
"""Gets the status of the OP25 process asynchronously."""
print(f"Getting OP25 status")
return await self._get("/op25/status")
async def generate_op25_config(self, config_data: ConfigGenerator):
"""
Generates the OP25 configuration file asynchronously.
Args:
config_data: A ConfigGenerator object representing the configuration data.
"""
# Convert the ConfigGenerator object to a dictionary before sending as JSON
config_data = config_data.to_dict()
print(f"Generate OP25 config", config_data)
return await self._post("/op25/generate-config", data=config_data)
# --- Pulse Audio Endpoints ---
async def get_pulse_status(self):
"""Gets the status of the Pulse Audio process asynchronously."""
print(f"Checking Pulseaudio status")
return await self._get("/pulse/status")
# --- Bot Endpoints ---
async def start_bot(self, token: str):
"""
Starts the Discord bot asynchronously.
Args:
token: The Discord bot token.
"""
print(f"Starting bot with token: '{token}'")
return await self._post("/bot/start_bot", data={"token": token})
async def stop_bot(self):
"""Stops the Discord bot asynchronously."""
print("Stopping bot")
return await self._post("/bot/stop_bot")
async def join_voice_channel(self, guild_id: int, channel_id: int):
"""
Joins a voice channel asynchronously.
Args:
guild_id: The ID of the guild.
channel_id: The ID of the voice channel.
"""
print(f"Joining voice channel: Guild ID: {guild_id}, Channel ID: {channel_id}")
return await self._post("/bot/join_voice", data={"guild_id": guild_id, "channel_id": channel_id})
async def leave_voice_channel(self, guild_id: int):
"""
Leaves a voice channel asynchronously.
Args:
guild_id: The ID of the guild to leave the voice channel from.
"""
print(f"Leaving voice channel on guild: {guild_id}")
return await self._post("/bot/leave_voice", data={"guild_id": guild_id})
async def get_bot_status(self):
"""Gets the status of the Discord bot asynchronously."""
print("Getting bot status")
return await self._get("/bot/status")
# Example Usage (assuming your FastAPI app is running on http://localhost:8000)
async def example_usage():
"""Demonstrates asynchronous API interaction using httpx."""
# Use async with to ensure the client is properly closed
async with DRBCDBAPI("http://localhost:8000") as api:
try:
# Example OP25 calls - remember to await them
print("Starting OP25...")
# Note: This will likely fail if the endpoint expects no body or a different structure
# Based on the original requests code, it sent an empty body for start/stop
# If your FastAPI requires a specific model, you'll need to pass that data=...
# For now, assuming it accepts an empty body or ignores it for these endpoints.
print(await api.start_op25())
print("OP25 Status:", await api.get_op25_status())
# Example of generating a P25 config (replace with your actual data)
p25_config_data = {
"type": "p25",
"systemName": "MyP25System",
"channels": ["851.0125", "851.0250"],
"tags": [{"talkgroup": 12345, "tagDec": "Police Dispatch"}],
"whitelist": [12345, 67890]
}
print("Generating OP25 P25 config...")
print(await api.generate_op25_config(p25_config_data))
# Example of generating an Analog config (replace with your actual data)
analog_config_data = {
"type": "nbfm", # Note: The API code uses "nbfm" for analog
"config": {
"systemName": "MyAnalogChannel",
"frequency": 453.250,
"nbfmSquelch": -120
}
}
print("Generating OP25 Analog config...")
print(await api.generate_op25_config(analog_config_data))
# Example Pulse Audio call
print("Pulse Audio Status:", await api.get_pulse_status())
# Example Bot calls (replace with your actual bot token, guild_id, channel_id)
# print("Starting bot...")
# print(await api.start_bot("YOUR_BOT_TOKEN"))
# print("Bot Status:", await api.get_bot_status())
# print("Joining voice channel...")
# print(await api.join_voice_channel(1234567890, 9876543210)) # Replace with actual IDs
# print("Leaving voice channel...")
# print(await api.leave_voice_channel(1234567890)) # Replace with actual ID
# print("Stopping bot...")
# print(await api.stop_bot())
# print("Stopping OP25...")
# print(await api.stop_op25())
# print("OP25 Status:", await api.get_op25_status())
except httpx.RequestError as e:
print(f"An httpx request error occurred during API interaction: {e}")
except httpx.HTTPStatusError as e:
print(f"An httpx HTTP status error occurred during API interaction: {e}")
# You can access the response body here via e.response.text or e.response.json()
print(f"Response body: {e.response.text}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
if __name__ == "__main__":
# To run this example, you need to run it within an asyncio event loop
asyncio.run(example_usage())