from fastapi import HTTPException, APIRouter from pydantic import BaseModel from enum import Enum import subprocess import os import signal import json import csv from typing import List, Optional router = APIRouter() op25_process = None OP25_PATH = "/op25/op25/gr-op25_repeater/apps/" OP25_SCRIPT = "run_multi-rx_service.sh" @router.post("/start") async def start_op25(): global op25_process if op25_process is None: try: op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH) print(op25_process) return {"status": "OP25 started"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) else: return {"status": "OP25 already running"} @router.post("/stop") async def stop_op25(): global op25_process if op25_process is not None: try: os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM) op25_process = None return {"status": "OP25 stopped"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) else: return {"status": "OP25 is not running"} @router.get("/status") async def get_status(): return {"status": "running" if op25_process else "stopped"} class DecodeMode(str, Enum): P25 = "P25" DMR = "DMR" ANALOG = "NBFM" class TalkgroupTag(BaseModel): talkgroup: str tagDec: int class ConfigGenerator(BaseModel): type: DecodeMode systemName: str channels: List[str] tags: List[TalkgroupTag] whitelist: List[int] class DemodType(str, Enum): CQPSK = "cqpsk" FSK4 = "fsk4" class FilterType(str, Enum): RC = "rc" WIDEPULSE = "widepulse" class ChannelConfig(BaseModel): name: str trunking_sysname: Optional[str] enable_analog: str demod_type: DemodType filter_type: FilterType device: Optional[str] = "sdr" cqpsk_tracking: Optional[bool] = None frequency: Optional[float] = None nbfmSquelch: Optional[float] = None destination: Optional[str] = "udp://127.0.0.1:23456" tracking_threshold: Optional[int] = 120 tracking_feedback: Optional[float] = 0.75 excess_bw: Optional[float] = 0.2 if_rate: Optional[int] = 24000 plot: Optional[str] = "" symbol_rate: Optional[int] = 4800 blacklist: Optional[str] = "" whitelist: Optional[str] = "" class DeviceConfig(BaseModel): args: Optional[str] = "rtl" gains: Optional[str] = "lna:39" gain_mode: Optional[bool] = False name: Optional[str] = "sdr" offset: Optional[int] = 0 ppm: Optional[float] = 0.0 rate: Optional[int] = 1920000 usable_bw_pct: Optional[float] = 0.85 tunable: Optional[bool] = True class TrunkingChannelConfig(BaseModel): sysname: str control_channel_list: str tagsFile: Optional[str] = None whitelist: Optional[str] = None nac: Optional[str] = "" wacn: Optional[str] = "" tdma_cc: Optional[bool] = False crypt_behavior: Optional[int] = 2 class TrunkingConfig(BaseModel): module: str chans: List[TrunkingChannelConfig] class AudioInstanceConfig(BaseModel): instance_name: Optional[str] = "audio0" device_name: Optional[str] = "pulse" udp_port: Optional[int] = 23456 audio_gain: Optional[float] = 2.5 number_channels: Optional[int] = 1 class AudioConfig(BaseModel): module: Optional[str] = "sockaudio.py" instances: Optional[List[AudioInstanceConfig]] = [AudioInstanceConfig()] class TerminalConfig(BaseModel): module: Optional[str] = "terminal.py" terminal_type: Optional[str] = "http:0.0.0.0:8081" terminal_timeout: Optional[float] = 5.0 curses_plot_interval: Optional[float] = 0.2 http_plot_interval: Optional[float] = 1.0 http_plot_directory: Optional[str] = "../www/images" tuning_step_large: Optional[int] = 1200 tuning_step_small: Optional[int] = 100 @router.post("/generate-config") async def generate_config(generator: ConfigGenerator): try: if generator.type == DecodeMode.P25: channels = [ChannelConfig( name=generator.systemName, trunking_sysname=generator.systemName, enable_analog="off", demod_type="cqpsk", cqpsk_tracking=True, filter_type="rc" )] devices = [DeviceConfig()] save_talkgroup_tags(generator.tags) save_whitelist(generator.whitelist) trunking = TrunkingConfig( module="tk_p25.py", chans=[TrunkingChannelConfig( sysname=generator.systemName, control_channel_list=','.join(generator.channels), tagsFile="/configs/active.cfg.tags.tsv", whitelist="/configs/active.cfg.whitelist.tsv" )] ) audio = AudioConfig() terminal = TerminalConfig() config_dict = { "channels": [channel.dict() for channel in channels], "devices": [device.dict() for device in devices], "trunking": trunking.dict(), "audio": audio.dict(), "terminal": terminal.dict() } elif generator.type == DecodeMode.ANALOG: generator = generator.config channels = [ChannelConfig( channelName=generator.systemName, enableAnalog="on", demodType="fsk4", frequency=generator.frequency, filterType="widepulse", nbfmSquelch=generator.nbfmSquelch )] devices = [DeviceConfig(gain="LNA:32")] config_dict = { "channels": [channel.dict() for channel in channels], "devices": [device.dict() for device in devices] } else: raise HTTPException(status_code=400, detail="Invalid configuration type. Must be 'p25' or 'nbfm'.") with open('/configs/active.cfg.json', 'w') as f: json.dump(del_none_in_dict(config_dict), f, indent=2) return {"message": "Config exported to '/configs/active.cfg.json'"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: """ Writes a list of tags to the tags file. Args: talkgroup_tags (List[TalkgroupTag]): The list of TalkgroupTag instances. """ with open("/configs/active.cfg.tags.tsv", 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file, delimiter='\t', lineterminator='\n') # Write rows for tag in talkgroup_tags: writer.writerow([tag.talkgroup, tag.tagDec]) def save_whitelist(talkgroup_tags: List[int]) -> None: """ Writes a list of talkgroups to the whitelists file. Args: talkgroup_tags (List[int]): The list of decimals to whitelist. """ with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file, delimiter='\t', lineterminator='\n') # Write rows for tag in talkgroup_tags: writer.writerow([tag]) def del_none_in_dict(d): """ Delete keys with the value ``None`` in a dictionary, recursively. This alters the input so you may wish to ``copy`` the dict first. """ for key, value in list(d.items()): print(f"Key: '{key}'\nValue: '{value}'") if value is None: del d[key] elif isinstance(value, dict): del_none_in_dict(value) elif isinstance(value, list): for iterative_value in value: del_none_in_dict(iterative_value) return d # For convenience