from fastapi import FastAPI, HTTPException, APIRouter from pydantic import BaseModel from enum import Enum import subprocess import os import signal import json import csv from typing import List, Optional, Union router = APIRouter() op25_process = None OP25_PATH = "/op25/op25/gr-op25_repeater/apps/" OP25_SCRIPT = "run_multi-rx_service.sh" @router.post("/start") async def start_op25(): global op25_process if op25_process is None: try: op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH) print(op25_process) return {"status": "OP25 started"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) else: return {"status": "OP25 already running"} @router.post("/stop") async def stop_op25(): global op25_process if op25_process is not None: try: os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM) op25_process = None return {"status": "OP25 stopped"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) else: return {"status": "OP25 is not running"} @router.get("/status") async def get_status(): return {"status": "running" if op25_process else "stopped"} class DecodeMode(str, Enum): P25 = "P25" DMR = "DMR" ANALOG = "NBFM" class TalkgroupTag(BaseModel): talkgroup: str tagDec: int class ConfigGenerator(BaseModel): type: DecodeMode systemName: str channels: List[str] tags: List[TalkgroupTag] whitelist: List[int] class DemodType(str, Enum): CQPSK = "cqpsk" FSK4 = "fsk4" class FilterType(str, Enum): RC = "rc" WIDEPULSE = "widepulse" class ChannelConfig(BaseModel): name: str trunking_sysname: Optional[str] enable_analog: str demod_type: DemodType filter_type: FilterType device: Optional[str] = "sdr" cqpsk_tracking: Optional[bool] = None frequency: Optional[float] = None nbfmSquelch: Optional[float] = None destination: Optional[str] = "udp://127.0.0.1:23456" tracking_threshold: Optional[int] = 120 tracking_feedback: Optional[float] = 0.75 excess_bw: Optional[float] = 0.2 if_rate: Optional[int] = 24000 plot: Optional[str] = "" symbol_rate: Optional[int] = 4800 blacklist: Optional[str] = "" whitelist: Optional[str] = "" class DeviceConfig(BaseModel): args: Optional[str] = "rtl" gains: Optional[str] = "lna:39" gain_mode: Optional[bool] = False name: Optional[str] = "sdr" offset: Optional[int] = 0 ppm: Optional[float] = 0.0 rate: Optional[int] = 1920000 usable_bw_pct: Optional[float] = 0.85 tunable: Optional[bool] = True class TrunkingChannelConfig(BaseModel): sysname: str control_channel_list: str tagsFile: Optional[str] = None whitelist: Optional[str] = None nac: Optional[str] = "" wacn: Optional[str] = "" tdma_cc: Optional[bool] = False crypt_behavior: Optional[int] = 2 class TrunkingConfig(BaseModel): module: str chans: List[TrunkingChannelConfig] class AudioInstanceConfig(BaseModel): instance_name: Optional[str] = "audio0" device_name: Optional[str] = "pulse" udp_port: Optional[int] = 23456 audio_gain: Optional[float] = 2.5 number_channels: Optional[int] = 1 class AudioConfig(BaseModel): module: Optional[str] = "sockaudio.py" instances: Optional[List[AudioInstanceConfig]] = [AudioInstanceConfig()] class TerminalConfig(BaseModel): module: Optional[str] = "terminal.py" terminal_type: Optional[str] = "http:0.0.0.0:8081" terminal_timeout: Optional[float] = 5.0 curses_plot_interval: Optional[float] = 0.2 http_plot_interval: Optional[float] = 1.0 http_plot_directory: Optional[str] = "../www/images" tuning_step_large: Optional[int] = 1200 tuning_step_small: Optional[int] = 100 @router.post("/generate-config") async def generate_config(generator: ConfigGenerator): try: if generator.type == DecodeMode.P25: channels = [ChannelConfig( name=generator.systemName, trunking_sysname=generator.systemName, enable_analog="off", demod_type="cqpsk", cqpsk_tracking=True, filter_type="rc" )] devices = [DeviceConfig()] save_talkgroup_tags(generator.tags) save_whitelist(generator.whitelist) trunking = TrunkingConfig( module="tk_p25.py", chans=[TrunkingChannelConfig( sysname=generator.systemName, control_channel_list=','.join(generator.channels), tagsFile="/configs/active.cfg.tags.tsv", whitelist="/configs/active.cfg.whitelist.tsv" )] ) audio = AudioConfig() terminal = TerminalConfig() config_dict = { "channels": [channel.dict() for channel in channels], "devices": [device.dict() for device in devices], "trunking": trunking.dict(), "audio": audio.dict(), "terminal": terminal.dict() } elif generator.type == DecodeMode.ANALOG: generator = generator.config channels = [ChannelConfig( channelName=generator.systemName, enableAnalog="on", demodType="fsk4", frequency=generator.frequency, filterType="widepulse", nbfmSquelch=generator.nbfmSquelch )] devices = [DeviceConfig(gain="LNA:32")] config_dict = { "channels": [channel.dict() for channel in channels], "devices": [device.dict() for device in devices] } else: raise HTTPException(status_code=400, detail="Invalid configuration type. Must be 'p25' or 'nbfm'.") with open('/configs/active.cfg.json', 'w') as f: json.dump(del_none_in_dict(config_dict), f, indent=2) return {"message": f"Config exported to '/configs/active.cfg.json'"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: """ Writes a list of tags to the tags file. Args: talkgroup_tags (List[TalkgroupTag]): The list of TalkgroupTag instances. """ with open("/configs/active.cfg.tags.tsv", 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file, delimiter='\t', lineterminator='\n') # Write rows for tag in talkgroup_tags: writer.writerow([tag.talkgroup, tag.tagDec]) def save_whitelist(talkgroup_tags: List[int]) -> None: """ Writes a list of talkgroups to the whitelists file. Args: talkgroup_tags (List[int]): The list of decimals to whitelist. """ with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file, delimiter='\t', lineterminator='\n') # Write rows for tag in talkgroup_tags: writer.writerow([tag]) def del_none_in_dict(d): """ Delete keys with the value ``None`` in a dictionary, recursively. This alters the input so you may wish to ``copy`` the dict first. """ for key, value in list(d.items()): print(f"Key: '{key}'\nValue: '{value}'") if value is None: del d[key] elif isinstance(value, dict): del_none_in_dict(value) elif isinstance(value, list): for iterative_value in value: del_none_in_dict(iterative_value) return d # For convenience