Implement presence change
Some checks failed
Lint / lint (pull_request) Failing after 7s

This commit is contained in:
Logan Cusano
2025-06-29 15:24:19 -04:00
parent a26dd619b8
commit fb9f8a680f
4 changed files with 224 additions and 176 deletions

View File

@@ -0,0 +1,70 @@
import csv
import json
from models import ConfigGenerator, DecodeMode, ChannelConfig, DeviceConfig, TrunkingConfig, TrunkingChannelConfig, AudioConfig, TerminalConfig, TalkgroupTag
from typing import List
from internal.logger import create_logger
LOGGER = create_logger(__name__)
def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None:
"""
Writes a list of tags to the tags file.
Args:
talkgroup_tags (List[TalkgroupTag]): The list of TalkgroupTag instances.
"""
with open("/configs/active.cfg.tags.tsv", 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n')
# Write rows
for tag in talkgroup_tags:
writer.writerow([tag.talkgroup, tag.tagDec])
def save_whitelist(talkgroup_tags: List[int]) -> None:
"""
Writes a list of talkgroups to the whitelists file.
Args:
talkgroup_tags (List[int]): The list of decimals to whitelist.
"""
with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n')
# Write rows
for tag in talkgroup_tags:
writer.writerow([tag])
def del_none_in_dict(d):
"""
Delete keys with the value ``None`` in a dictionary, recursively.
This alters the input so you may wish to ``copy`` the dict first.
"""
for key, value in list(d.items()):
LOGGER.info(f"Key: '{key}'\nValue: '{value}'")
if value is None:
del d[key]
elif isinstance(value, dict):
del_none_in_dict(value)
elif isinstance(value, list):
for iterative_value in value:
del_none_in_dict(iterative_value)
return d # For convenience
def get_current_system_from_config() -> str:
# Get the current config
with open('/configs/active.cfg.json', 'r') as f:
json_data = f.read()
if isinstance(json_data, str):
try:
data = json.loads(json_data)
except json.JSONDecodeError:
return None
elif isinstance(json_data, dict):
data = json_data
else:
return None
if "channels" in data and isinstance(data["channels"], list) and len(data["channels"]) > 0:
first_channel = data["channels"][0]
if "name" in first_channel:
return first_channel["name"]
return None

View File

@@ -3,6 +3,7 @@ import routers.op25_controller as op25_controller
import routers.pulse as pulse import routers.pulse as pulse
import routers.bot as bot import routers.bot as bot
from internal.logger import create_logger from internal.logger import create_logger
from internal.bot_manager import DiscordBotManager
# Initialize logging # Initialize logging
LOGGER = create_logger(__name__) LOGGER = create_logger(__name__)
@@ -10,6 +11,9 @@ LOGGER = create_logger(__name__)
# Define FastAPI app # Define FastAPI app
app = FastAPI() app = FastAPI()
app.include_router(op25_controller.router, prefix="/op25") # Initialize Discord Bot Manager
bot_manager_instance = DiscordBotManager()
app.include_router(op25_controller.create_op25_router(bot_manager=bot_manager_instance), prefix="/op25")
app.include_router(pulse.router, prefix="/pulse") app.include_router(pulse.router, prefix="/pulse")
app.include_router(bot.router, prefix="/bot") app.include_router(bot.create_bot_router(bot_manager=bot_manager_instance), prefix="/bot")

View File

@@ -5,54 +5,53 @@ from internal.logger import create_logger
LOGGER = create_logger(__name__) LOGGER = create_logger(__name__)
# Define FastAPI app def create_bot_router(bot_manager: DiscordBotManager): # Function to create router
router = APIRouter() router = APIRouter()
# Initialize Discord Bot Manager # API Endpoints
bot_manager = DiscordBotManager() @router.post("/start_bot")
async def start_bot(config: BotConfig):
try:
await bot_manager.start_bot(config.token)
return {"status": "Bot started successfully."}
except Exception as e:
LOGGER.error(f"Error starting bot: {e}")
raise HTTPException(status_code=400, detail=str(e))
# API Endpoints @router.post("/stop_bot")
@router.post("/start_bot") async def stop_bot():
async def start_bot(config: BotConfig): try:
try: await bot_manager.stop_bot()
await bot_manager.start_bot(config.token) return {"status": "Bot stopped successfully."}
return {"status": "Bot started successfully."} except Exception as e:
except Exception as e: LOGGER.error(f"Error stopping bot: {e}")
LOGGER.error(f"Error starting bot: {e}") raise HTTPException(status_code=400, detail=str(e))
raise HTTPException(status_code=400, detail=str(e))
@router.post("/stop_bot") @router.post("/join_voice")
async def stop_bot(): async def join_voice_channel(request: VoiceChannelJoinRequest):
try: try:
await bot_manager.stop_bot() await bot_manager.join_voice_channel(request.guild_id, request.channel_id)
return {"status": "Bot stopped successfully."} return {"status": f"Joined guild {request.guild_id} voice channel {request.channel_id}."}
except Exception as e: except Exception as e:
LOGGER.error(f"Error stopping bot: {e}") LOGGER.error(f"Error joining voice channel: {e}")
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
@router.post("/join_voice") @router.post("/leave_voice")
async def join_voice_channel(request: VoiceChannelJoinRequest): async def leave_voice_channel(request: VoiceChannelLeaveRequest):
try: try:
await bot_manager.join_voice_channel(request.guild_id, request.channel_id) await bot_manager.leave_voice_channel(request.guild_id)
return {"status": f"Joined guild {request.guild_id} voice channel {request.channel_id}."} return {"status": f"Left guild {request.guild_id} voice channel."}
except Exception as e: except Exception as e:
LOGGER.error(f"Error joining voice channel: {e}") LOGGER.error(f"Error leaving voice channel: {e}")
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
@router.post("/leave_voice") @router.get("/status")
async def leave_voice_channel(request: VoiceChannelLeaveRequest): async def get_status():
try: status = {
await bot_manager.leave_voice_channel(request.guild_id) "bot_running": bot_manager.bot is not None and not bot_manager.bot.is_closed(),
return {"status": f"Left guild {request.guild_id} voice channel."} "connected_guilds": list(bot_manager.voice_clients.keys()),
except Exception as e: "active_token": bot_manager.token
LOGGER.error(f"Error leaving voice channel: {e}") }
raise HTTPException(status_code=400, detail=str(e)) return status
@router.get("/status") return router # Return the configured router
async def get_status():
status = {
"bot_running": bot_manager.bot is not None and not bot_manager.bot.is_closed(),
"connected_guilds": list(bot_manager.voice_clients.keys()),
"active_token": bot_manager.token
}
return status

View File

@@ -3,151 +3,126 @@ import subprocess
import os import os
import signal import signal
import json import json
import csv
from models import ConfigGenerator, DecodeMode, ChannelConfig, DeviceConfig, TrunkingConfig, TrunkingChannelConfig, AudioConfig, TerminalConfig, TalkgroupTag from models import ConfigGenerator, DecodeMode, ChannelConfig, DeviceConfig, TrunkingConfig, TrunkingChannelConfig, AudioConfig, TerminalConfig, TalkgroupTag
from internal.logger import create_logger from internal.logger import create_logger
from internal.bot_manager import DiscordBotManager
from typing import List from typing import List
from internal.op25_config_utls import save_talkgroup_tags, save_whitelist, del_none_in_dict, get_current_system_from_config
router = APIRouter()
LOGGER = create_logger(__name__) LOGGER = create_logger(__name__)
op25_process = None op25_process = None
OP25_PATH = "/op25/op25/gr-op25_repeater/apps/" OP25_PATH = "/op25/op25/gr-op25_repeater/apps/"
OP25_SCRIPT = "run_multi-rx_service.sh" OP25_SCRIPT = "run_multi-rx_service.sh"
@router.post("/start") def create_op25_router(bot_manager: DiscordBotManager):
async def start_op25(): router = APIRouter()
global op25_process
if op25_process is None:
try:
op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH)
LOGGER.debug(op25_process)
return {"status": "OP25 started"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
else:
return {"status": "OP25 already running"}
@router.post("/stop")
async def stop_op25():
global op25_process
if op25_process is not None:
try:
os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM)
op25_process = None
return {"status": "OP25 stopped"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
else:
return {"status": "OP25 is not running"}
@router.get("/status")
async def get_status():
return {"status": "running" if op25_process else "stopped"}
@router.post("/generate-config")
async def generate_config(generator: ConfigGenerator):
try:
if generator.type == DecodeMode.P25:
channels = [ChannelConfig(
name=generator.systemName,
trunking_sysname=generator.systemName,
enable_analog="off",
demod_type="cqpsk",
cqpsk_tracking=True,
filter_type="rc"
)]
devices = [DeviceConfig()]
save_talkgroup_tags(generator.tags)
save_whitelist(generator.whitelist)
trunking = TrunkingConfig(
module="tk_p25.py",
chans=[TrunkingChannelConfig(
sysname=generator.systemName,
control_channel_list=','.join(generator.channels),
tagsFile="/configs/active.cfg.tags.tsv",
whitelist="/configs/active.cfg.whitelist.tsv"
)]
)
audio = AudioConfig()
terminal = TerminalConfig()
config_dict = {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices],
"trunking": trunking.dict(),
"audio": audio.dict(),
"terminal": terminal.dict()
}
elif generator.type == DecodeMode.ANALOG:
generator = generator.config
channels = [ChannelConfig(
channelName=generator.systemName,
enableAnalog="on",
demodType="fsk4",
frequency=generator.frequency,
filterType="widepulse",
nbfmSquelch=generator.nbfmSquelch
)]
devices = [DeviceConfig(gain="LNA:32")]
config_dict = {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices]
}
@router.post("/start")
async def start_op25():
global op25_process
if op25_process is None:
try:
op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH)
LOGGER.debug(op25_process)
return {"status": "OP25 started"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
else: else:
raise HTTPException(status_code=400, detail="Invalid configuration type. Must be 'p25' or 'nbfm'.") return {"status": "OP25 already running"}
with open('/configs/active.cfg.json', 'w') as f: @router.post("/stop")
json.dump(del_none_in_dict(config_dict), f, indent=2) async def stop_op25():
global op25_process
if op25_process is not None:
try:
os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM)
op25_process = None
return {"status": "OP25 stopped"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
else:
return {"status": "OP25 is not running"}
return {"message": "Config exported to '/configs/active.cfg.json'"} @router.get("/status")
except Exception as e: async def get_status():
raise HTTPException(status_code=500, detail=str(e)) return {"status": "running" if op25_process else "stopped"}
def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: @router.post("/generate-config")
""" async def generate_config(generator: ConfigGenerator):
Writes a list of tags to the tags file. try:
if generator.type == DecodeMode.P25:
channels = [ChannelConfig(
name=generator.systemName,
trunking_sysname=generator.systemName,
enable_analog="off",
demod_type="cqpsk",
cqpsk_tracking=True,
filter_type="rc"
)]
devices = [DeviceConfig()]
save_talkgroup_tags(generator.tags)
save_whitelist(generator.whitelist)
trunking = TrunkingConfig(
module="tk_p25.py",
chans=[TrunkingChannelConfig(
sysname=generator.systemName,
control_channel_list=','.join(generator.channels),
tagsFile="/configs/active.cfg.tags.tsv",
whitelist="/configs/active.cfg.whitelist.tsv"
)]
)
Args: audio = AudioConfig()
talkgroup_tags (List[TalkgroupTag]): The list of TalkgroupTag instances.
"""
with open("/configs/active.cfg.tags.tsv", 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n')
# Write rows
for tag in talkgroup_tags:
writer.writerow([tag.talkgroup, tag.tagDec])
def save_whitelist(talkgroup_tags: List[int]) -> None: terminal = TerminalConfig()
"""
Writes a list of talkgroups to the whitelists file.
Args: config_dict = {
talkgroup_tags (List[int]): The list of decimals to whitelist. "channels": [channel.dict() for channel in channels],
""" "devices": [device.dict() for device in devices],
with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file: "trunking": trunking.dict(),
writer = csv.writer(file, delimiter='\t', lineterminator='\n') "audio": audio.dict(),
# Write rows "terminal": terminal.dict()
for tag in talkgroup_tags: }
writer.writerow([tag])
def del_none_in_dict(d): elif generator.type == DecodeMode.ANALOG:
""" generator = generator.config
Delete keys with the value ``None`` in a dictionary, recursively. channels = [ChannelConfig(
channelName=generator.systemName,
enableAnalog="on",
demodType="fsk4",
frequency=generator.frequency,
filterType="widepulse",
nbfmSquelch=generator.nbfmSquelch
)]
devices = [DeviceConfig(gain="LNA:32")]
config_dict = {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices]
}
else:
raise HTTPException(status_code=400, detail="Invalid configuration type. Must be 'p25' or 'nbfm'.")
with open('/configs/active.cfg.json', 'w') as f:
json.dump(del_none_in_dict(config_dict), f, indent=2)
# Set the presence of the bot (if it's online)
await bot_manager.set_presence(generator.systemName)
return {"message": "Config exported to '/configs/active.cfg.json'"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/update-prensece")
async def update_prensece():
current_system = get_current_system_from_config()
if not current_system:
raise HTTPException(status_code=500, detail="Unable to get current system.")
await bot_manager.set_presence(current_system)
return current_system
return router
This alters the input so you may wish to ``copy`` the dict first.
"""
for key, value in list(d.items()):
LOGGER.info(f"Key: '{key}'\nValue: '{value}'")
if value is None:
del d[key]
elif isinstance(value, dict):
del_none_in_dict(value)
elif isinstance(value, list):
for iterative_value in value:
del_none_in_dict(iterative_value)
return d # For convenience