diff --git a/app/internal/op25_config_utls.py b/app/internal/op25_config_utls.py new file mode 100644 index 0000000..09b8732 --- /dev/null +++ b/app/internal/op25_config_utls.py @@ -0,0 +1,70 @@ +import csv +import json +from models import ConfigGenerator, DecodeMode, ChannelConfig, DeviceConfig, TrunkingConfig, TrunkingChannelConfig, AudioConfig, TerminalConfig, TalkgroupTag +from typing import List +from internal.logger import create_logger + +LOGGER = create_logger(__name__) + +def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: + """ + Writes a list of tags to the tags file. + + Args: + talkgroup_tags (List[TalkgroupTag]): The list of TalkgroupTag instances. + """ + with open("/configs/active.cfg.tags.tsv", 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file, delimiter='\t', lineterminator='\n') + # Write rows + for tag in talkgroup_tags: + writer.writerow([tag.talkgroup, tag.tagDec]) + +def save_whitelist(talkgroup_tags: List[int]) -> None: + """ + Writes a list of talkgroups to the whitelists file. + + Args: + talkgroup_tags (List[int]): The list of decimals to whitelist. + """ + with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file, delimiter='\t', lineterminator='\n') + # Write rows + for tag in talkgroup_tags: + writer.writerow([tag]) + +def del_none_in_dict(d): + """ + Delete keys with the value ``None`` in a dictionary, recursively. + + This alters the input so you may wish to ``copy`` the dict first. + """ + for key, value in list(d.items()): + LOGGER.info(f"Key: '{key}'\nValue: '{value}'") + if value is None: + del d[key] + elif isinstance(value, dict): + del_none_in_dict(value) + elif isinstance(value, list): + for iterative_value in value: + del_none_in_dict(iterative_value) + return d # For convenience + +def get_current_system_from_config() -> str: + # Get the current config + with open('/configs/active.cfg.json', 'r') as f: + json_data = f.read() + if isinstance(json_data, str): + try: + data = json.loads(json_data) + except json.JSONDecodeError: + return None + elif isinstance(json_data, dict): + data = json_data + else: + return None + + if "channels" in data and isinstance(data["channels"], list) and len(data["channels"]) > 0: + first_channel = data["channels"][0] + if "name" in first_channel: + return first_channel["name"] + return None \ No newline at end of file diff --git a/app/main.py b/app/main.py index 99a965e..c5c9e94 100644 --- a/app/main.py +++ b/app/main.py @@ -3,6 +3,7 @@ import routers.op25_controller as op25_controller import routers.pulse as pulse import routers.bot as bot from internal.logger import create_logger +from internal.bot_manager import DiscordBotManager # Initialize logging LOGGER = create_logger(__name__) @@ -10,6 +11,9 @@ LOGGER = create_logger(__name__) # Define FastAPI app app = FastAPI() -app.include_router(op25_controller.router, prefix="/op25") +# Initialize Discord Bot Manager +bot_manager_instance = DiscordBotManager() + +app.include_router(op25_controller.create_op25_router(bot_manager=bot_manager_instance), prefix="/op25") app.include_router(pulse.router, prefix="/pulse") -app.include_router(bot.router, prefix="/bot") +app.include_router(bot.create_bot_router(bot_manager=bot_manager_instance), prefix="/bot") diff --git a/app/routers/bot.py b/app/routers/bot.py index d477d27..dddb62e 100644 --- a/app/routers/bot.py +++ b/app/routers/bot.py @@ -5,54 +5,53 @@ from internal.logger import create_logger LOGGER = create_logger(__name__) -# Define FastAPI app -router = APIRouter() +def create_bot_router(bot_manager: DiscordBotManager): # Function to create router + router = APIRouter() -# Initialize Discord Bot Manager -bot_manager = DiscordBotManager() + # API Endpoints + @router.post("/start_bot") + async def start_bot(config: BotConfig): + try: + await bot_manager.start_bot(config.token) + return {"status": "Bot started successfully."} + except Exception as e: + LOGGER.error(f"Error starting bot: {e}") + raise HTTPException(status_code=400, detail=str(e)) -# API Endpoints -@router.post("/start_bot") -async def start_bot(config: BotConfig): - try: - await bot_manager.start_bot(config.token) - return {"status": "Bot started successfully."} - except Exception as e: - LOGGER.error(f"Error starting bot: {e}") - raise HTTPException(status_code=400, detail=str(e)) + @router.post("/stop_bot") + async def stop_bot(): + try: + await bot_manager.stop_bot() + return {"status": "Bot stopped successfully."} + except Exception as e: + LOGGER.error(f"Error stopping bot: {e}") + raise HTTPException(status_code=400, detail=str(e)) -@router.post("/stop_bot") -async def stop_bot(): - try: - await bot_manager.stop_bot() - return {"status": "Bot stopped successfully."} - except Exception as e: - LOGGER.error(f"Error stopping bot: {e}") - raise HTTPException(status_code=400, detail=str(e)) + @router.post("/join_voice") + async def join_voice_channel(request: VoiceChannelJoinRequest): + try: + await bot_manager.join_voice_channel(request.guild_id, request.channel_id) + return {"status": f"Joined guild {request.guild_id} voice channel {request.channel_id}."} + except Exception as e: + LOGGER.error(f"Error joining voice channel: {e}") + raise HTTPException(status_code=400, detail=str(e)) -@router.post("/join_voice") -async def join_voice_channel(request: VoiceChannelJoinRequest): - try: - await bot_manager.join_voice_channel(request.guild_id, request.channel_id) - return {"status": f"Joined guild {request.guild_id} voice channel {request.channel_id}."} - except Exception as e: - LOGGER.error(f"Error joining voice channel: {e}") - raise HTTPException(status_code=400, detail=str(e)) + @router.post("/leave_voice") + async def leave_voice_channel(request: VoiceChannelLeaveRequest): + try: + await bot_manager.leave_voice_channel(request.guild_id) + return {"status": f"Left guild {request.guild_id} voice channel."} + except Exception as e: + LOGGER.error(f"Error leaving voice channel: {e}") + raise HTTPException(status_code=400, detail=str(e)) -@router.post("/leave_voice") -async def leave_voice_channel(request: VoiceChannelLeaveRequest): - try: - await bot_manager.leave_voice_channel(request.guild_id) - return {"status": f"Left guild {request.guild_id} voice channel."} - except Exception as e: - LOGGER.error(f"Error leaving voice channel: {e}") - raise HTTPException(status_code=400, detail=str(e)) - -@router.get("/status") -async def get_status(): - status = { - "bot_running": bot_manager.bot is not None and not bot_manager.bot.is_closed(), - "connected_guilds": list(bot_manager.voice_clients.keys()), - "active_token": bot_manager.token - } - return status + @router.get("/status") + async def get_status(): + status = { + "bot_running": bot_manager.bot is not None and not bot_manager.bot.is_closed(), + "connected_guilds": list(bot_manager.voice_clients.keys()), + "active_token": bot_manager.token + } + return status + + return router # Return the configured router diff --git a/app/routers/op25_controller.py b/app/routers/op25_controller.py index 3f4b620..929b3d6 100644 --- a/app/routers/op25_controller.py +++ b/app/routers/op25_controller.py @@ -3,151 +3,126 @@ import subprocess import os import signal import json -import csv from models import ConfigGenerator, DecodeMode, ChannelConfig, DeviceConfig, TrunkingConfig, TrunkingChannelConfig, AudioConfig, TerminalConfig, TalkgroupTag from internal.logger import create_logger +from internal.bot_manager import DiscordBotManager from typing import List +from internal.op25_config_utls import save_talkgroup_tags, save_whitelist, del_none_in_dict, get_current_system_from_config -router = APIRouter() LOGGER = create_logger(__name__) op25_process = None OP25_PATH = "/op25/op25/gr-op25_repeater/apps/" OP25_SCRIPT = "run_multi-rx_service.sh" -@router.post("/start") -async def start_op25(): - global op25_process - if op25_process is None: - try: - op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH) - LOGGER.debug(op25_process) - return {"status": "OP25 started"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - else: - return {"status": "OP25 already running"} - -@router.post("/stop") -async def stop_op25(): - global op25_process - if op25_process is not None: - try: - os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM) - op25_process = None - return {"status": "OP25 stopped"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - else: - return {"status": "OP25 is not running"} - -@router.get("/status") -async def get_status(): - return {"status": "running" if op25_process else "stopped"} - -@router.post("/generate-config") -async def generate_config(generator: ConfigGenerator): - try: - if generator.type == DecodeMode.P25: - channels = [ChannelConfig( - name=generator.systemName, - trunking_sysname=generator.systemName, - enable_analog="off", - demod_type="cqpsk", - cqpsk_tracking=True, - filter_type="rc" - )] - devices = [DeviceConfig()] - save_talkgroup_tags(generator.tags) - save_whitelist(generator.whitelist) - trunking = TrunkingConfig( - module="tk_p25.py", - chans=[TrunkingChannelConfig( - sysname=generator.systemName, - control_channel_list=','.join(generator.channels), - tagsFile="/configs/active.cfg.tags.tsv", - whitelist="/configs/active.cfg.whitelist.tsv" - )] - ) - - audio = AudioConfig() - - terminal = TerminalConfig() - - config_dict = { - "channels": [channel.dict() for channel in channels], - "devices": [device.dict() for device in devices], - "trunking": trunking.dict(), - "audio": audio.dict(), - "terminal": terminal.dict() - } - - elif generator.type == DecodeMode.ANALOG: - generator = generator.config - channels = [ChannelConfig( - channelName=generator.systemName, - enableAnalog="on", - demodType="fsk4", - frequency=generator.frequency, - filterType="widepulse", - nbfmSquelch=generator.nbfmSquelch - )] - devices = [DeviceConfig(gain="LNA:32")] - - config_dict = { - "channels": [channel.dict() for channel in channels], - "devices": [device.dict() for device in devices] - } +def create_op25_router(bot_manager: DiscordBotManager): + router = APIRouter() + @router.post("/start") + async def start_op25(): + global op25_process + if op25_process is None: + try: + op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH) + LOGGER.debug(op25_process) + return {"status": "OP25 started"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) else: - raise HTTPException(status_code=400, detail="Invalid configuration type. Must be 'p25' or 'nbfm'.") + return {"status": "OP25 already running"} - with open('/configs/active.cfg.json', 'w') as f: - json.dump(del_none_in_dict(config_dict), f, indent=2) + @router.post("/stop") + async def stop_op25(): + global op25_process + if op25_process is not None: + try: + os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM) + op25_process = None + return {"status": "OP25 stopped"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + else: + return {"status": "OP25 is not running"} - return {"message": "Config exported to '/configs/active.cfg.json'"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + @router.get("/status") + async def get_status(): + return {"status": "running" if op25_process else "stopped"} -def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: - """ - Writes a list of tags to the tags file. + @router.post("/generate-config") + async def generate_config(generator: ConfigGenerator): + try: + if generator.type == DecodeMode.P25: + channels = [ChannelConfig( + name=generator.systemName, + trunking_sysname=generator.systemName, + enable_analog="off", + demod_type="cqpsk", + cqpsk_tracking=True, + filter_type="rc" + )] + devices = [DeviceConfig()] + save_talkgroup_tags(generator.tags) + save_whitelist(generator.whitelist) + trunking = TrunkingConfig( + module="tk_p25.py", + chans=[TrunkingChannelConfig( + sysname=generator.systemName, + control_channel_list=','.join(generator.channels), + tagsFile="/configs/active.cfg.tags.tsv", + whitelist="/configs/active.cfg.whitelist.tsv" + )] + ) - Args: - talkgroup_tags (List[TalkgroupTag]): The list of TalkgroupTag instances. - """ - with open("/configs/active.cfg.tags.tsv", 'w', newline='', encoding='utf-8') as file: - writer = csv.writer(file, delimiter='\t', lineterminator='\n') - # Write rows - for tag in talkgroup_tags: - writer.writerow([tag.talkgroup, tag.tagDec]) + audio = AudioConfig() -def save_whitelist(talkgroup_tags: List[int]) -> None: - """ - Writes a list of talkgroups to the whitelists file. + terminal = TerminalConfig() - Args: - talkgroup_tags (List[int]): The list of decimals to whitelist. - """ - with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file: - writer = csv.writer(file, delimiter='\t', lineterminator='\n') - # Write rows - for tag in talkgroup_tags: - writer.writerow([tag]) + config_dict = { + "channels": [channel.dict() for channel in channels], + "devices": [device.dict() for device in devices], + "trunking": trunking.dict(), + "audio": audio.dict(), + "terminal": terminal.dict() + } -def del_none_in_dict(d): - """ - Delete keys with the value ``None`` in a dictionary, recursively. + elif generator.type == DecodeMode.ANALOG: + generator = generator.config + channels = [ChannelConfig( + channelName=generator.systemName, + enableAnalog="on", + demodType="fsk4", + frequency=generator.frequency, + filterType="widepulse", + nbfmSquelch=generator.nbfmSquelch + )] + devices = [DeviceConfig(gain="LNA:32")] + + config_dict = { + "channels": [channel.dict() for channel in channels], + "devices": [device.dict() for device in devices] + } + + else: + raise HTTPException(status_code=400, detail="Invalid configuration type. Must be 'p25' or 'nbfm'.") + + with open('/configs/active.cfg.json', 'w') as f: + json.dump(del_none_in_dict(config_dict), f, indent=2) + + # Set the presence of the bot (if it's online) + await bot_manager.set_presence(generator.systemName) + + return {"message": "Config exported to '/configs/active.cfg.json'"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @router.post("/update-prensece") + async def update_prensece(): + current_system = get_current_system_from_config() + if not current_system: + raise HTTPException(status_code=500, detail="Unable to get current system.") + + await bot_manager.set_presence(current_system) + return current_system + + return router - This alters the input so you may wish to ``copy`` the dict first. - """ - for key, value in list(d.items()): - LOGGER.info(f"Key: '{key}'\nValue: '{value}'") - if value is None: - del d[key] - elif isinstance(value, dict): - del_none_in_dict(value) - elif isinstance(value, list): - for iterative_value in value: - del_none_in_dict(iterative_value) - return d # For convenience