from fastapi import HTTPException, APIRouter import subprocess import os import signal import json import csv from models import ConfigGenerator, DecodeMode, ChannelConfig, DeviceConfig, TrunkingConfig, TrunkingChannelConfig, AudioConfig, TerminalConfig, TalkgroupTag from internal.logger import create_logger from typing import List router = APIRouter() LOGGER = create_logger(__name__) op25_process = None OP25_PATH = "/op25/op25/gr-op25_repeater/apps/" OP25_SCRIPT = "run_multi-rx_service.sh" @router.post("/start") async def start_op25(): global op25_process if op25_process is None: try: op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH) LOGGER.debug(op25_process) return {"status": "OP25 started"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) else: return {"status": "OP25 already running"} @router.post("/stop") async def stop_op25(): global op25_process if op25_process is not None: try: os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM) op25_process = None return {"status": "OP25 stopped"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) else: return {"status": "OP25 is not running"} @router.get("/status") async def get_status(): return {"status": "running" if op25_process else "stopped"} @router.post("/generate-config") async def generate_config(generator: ConfigGenerator): try: if generator.type == DecodeMode.P25: channels = [ChannelConfig( name=generator.systemName, trunking_sysname=generator.systemName, enable_analog="off", demod_type="cqpsk", cqpsk_tracking=True, filter_type="rc" )] devices = [DeviceConfig()] save_talkgroup_tags(generator.tags) save_whitelist(generator.whitelist) trunking = TrunkingConfig( module="tk_p25.py", chans=[TrunkingChannelConfig( sysname=generator.systemName, control_channel_list=','.join(generator.channels), tagsFile="/configs/active.cfg.tags.tsv", whitelist="/configs/active.cfg.whitelist.tsv" )] ) audio = AudioConfig() terminal = TerminalConfig() config_dict = { "channels": [channel.dict() for channel in channels], "devices": [device.dict() for device in devices], "trunking": trunking.dict(), "audio": audio.dict(), "terminal": terminal.dict() } elif generator.type == DecodeMode.ANALOG: generator = generator.config channels = [ChannelConfig( channelName=generator.systemName, enableAnalog="on", demodType="fsk4", frequency=generator.frequency, filterType="widepulse", nbfmSquelch=generator.nbfmSquelch )] devices = [DeviceConfig(gain="LNA:32")] config_dict = { "channels": [channel.dict() for channel in channels], "devices": [device.dict() for device in devices] } else: raise HTTPException(status_code=400, detail="Invalid configuration type. Must be 'p25' or 'nbfm'.") with open('/configs/active.cfg.json', 'w') as f: json.dump(del_none_in_dict(config_dict), f, indent=2) return {"message": "Config exported to '/configs/active.cfg.json'"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: """ Writes a list of tags to the tags file. Args: talkgroup_tags (List[TalkgroupTag]): The list of TalkgroupTag instances. """ with open("/configs/active.cfg.tags.tsv", 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file, delimiter='\t', lineterminator='\n') # Write rows for tag in talkgroup_tags: writer.writerow([tag.talkgroup, tag.tagDec]) def save_whitelist(talkgroup_tags: List[int]) -> None: """ Writes a list of talkgroups to the whitelists file. Args: talkgroup_tags (List[int]): The list of decimals to whitelist. """ with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file, delimiter='\t', lineterminator='\n') # Write rows for tag in talkgroup_tags: writer.writerow([tag]) def del_none_in_dict(d): """ Delete keys with the value ``None`` in a dictionary, recursively. This alters the input so you may wish to ``copy`` the dict first. """ for key, value in list(d.items()): LOGGER.info(f"Key: '{key}'\nValue: '{value}'") if value is None: del d[key] elif isinstance(value, dict): del_none_in_dict(value) elif isinstance(value, list): for iterative_value in value: del_none_in_dict(iterative_value) return d # For convenience