Init repo

This commit is contained in:
2025-01-25 02:31:27 -05:00
commit fd0231690b
12 changed files with 735 additions and 0 deletions

219
app/op25_controller.py Normal file
View File

@@ -0,0 +1,219 @@
from fastapi import FastAPI, HTTPException, APIRouter
from pydantic import BaseModel
from enum import Enum
import subprocess
import os
import signal
import json
from typing import List, Optional, Union
router = APIRouter()
op25_process = None
OP25_PATH = "/op25/op25/gr-op25_repeater/apps/"
OP25_SCRIPT = "run_multi-rx_service.sh"
@router.post("/start")
async def start_op25():
global op25_process
if op25_process is None:
try:
op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH)
print(op25_process)
return {"status": "OP25 started"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
else:
return {"status": "OP25 already running"}
@router.post("/stop")
async def stop_op25():
global op25_process
if op25_process is not None:
try:
os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM)
op25_process = None
return {"status": "OP25 stopped"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
else:
return {"status": "OP25 is not running"}
@router.get("/status")
async def get_status():
return {"status": "running" if op25_process else "stopped"}
class DecodeMode(str, Enum):
P25 = "P25"
DMR = "DMR"
ANALOG = "NBFM"
class P25Config(BaseModel):
systemName: str
controlChannels: List[str]
tagsFile: str
whitelistFile: Optional[str] = None
class NBFMConfig(BaseModel):
systemName: str
frequency: float
nbfmSquelch: Optional[float] = -70
class ConfigGenerator(BaseModel):
type: DecodeMode
config: Union[P25Config, NBFMConfig]
class DemodType(str, Enum):
CQPSK = "cqpsk"
FSK4 = "fsk4"
class FilterType(str, Enum):
RC = "rc"
WIDEPULSE = "widepulse"
class ChannelConfig(BaseModel):
name: str
trunking_sysname: Optional[str]
enable_analog: str
demod_type: DemodType
filter_type: FilterType
device: Optional[str] = "sdr"
cqpsk_tracking: Optional[bool] = None
frequency: Optional[float] = None
nbfmSquelch: Optional[float] = None
destination: Optional[str] = "udp://127.0.0.1:23456"
tracking_threshold: Optional[int] = 120
tracking_feedback: Optional[float] = 0.75
excess_bw: Optional[float] = 0.2
if_rate: Optional[int] = 24000
plot: Optional[str] = ""
symbol_rate: Optional[int] = 4800
blacklist: Optional[str] = ""
whitelist: Optional[str] = ""
class DeviceConfig(BaseModel):
args: Optional[str] = "rtl"
gains: Optional[str] = "lna:39"
gain_mode: Optional[bool] = False
name: Optional[str] = "sdr"
offset: Optional[int] = 0
ppm: Optional[float] = 0.0
rate: Optional[int] = 1920000
usable_bw_pct: Optional[float] = 0.85
tunable: Optional[bool] = True
class TrunkingChannelConfig(BaseModel):
sysname: str
control_channel_list: str
tagsFile: Optional[str] = None
whitelist: Optional[str] = None
nac: Optional[str] = ""
wacn: Optional[str] = ""
tdma_cc: Optional[bool] = False
crypt_behavior: Optional[int] = 2
class TrunkingConfig(BaseModel):
module: str
chans: List[TrunkingChannelConfig]
class AudioInstanceConfig(BaseModel):
instance_name: Optional[str] = "audio0"
device_name: Optional[str] = "pulse"
udp_port: Optional[int] = 23456
audio_gain: Optional[float] = 2.5
number_channels: Optional[int] = 1
class AudioConfig(BaseModel):
module: Optional[str] = "sockaudio.py"
instances: Optional[List[AudioInstanceConfig]] = [AudioInstanceConfig()]
class TerminalConfig(BaseModel):
module: Optional[str] = "terminal.py"
terminal_type: Optional[str] = "http:0.0.0.0:8081"
terminal_timeout: Optional[float] = 5.0
curses_plot_interval: Optional[float] = 0.2
http_plot_interval: Optional[float] = 1.0
http_plot_directory: Optional[str] = "../www/images"
tuning_step_large: Optional[int] = 1200
tuning_step_small: Optional[int] = 100
@router.post("/generate-config")
async def generate_config(generator: ConfigGenerator):
try:
if generator.type == DecodeMode.P25:
config_data = generator.config
channels = [ChannelConfig(
name=config_data.systemName,
trunking_sysname=config_data.systemName,
enable_analog="off",
demod_type="cqpsk",
cqpsk_tracking=True,
filter_type="rc"
)]
devices = [DeviceConfig()]
trunking = TrunkingConfig(
module="tk_p25.py",
chans=[TrunkingChannelConfig(
sysname=config_data.systemName,
control_channel_list=','.join(config_data.controlChannels),
tagsFile=config_data.tagsFile,
whitelist=config_data.whitelistFile
)]
)
audio = AudioConfig()
terminal = TerminalConfig()
config_dict = {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices],
"trunking": trunking.dict(),
"audio": audio.dict(),
"terminal": terminal.dict()
}
elif generator.type == DecodeMode.ANALOG:
config_data = generator.config
channels = [ChannelConfig(
channelName=config_data.systemName,
enableAnalog="on",
demodType="fsk4",
frequency=config_data.frequency,
filterType="widepulse",
nbfmSquelch=config_data.nbfmSquelch
)]
devices = [DeviceConfig(gain="LNA:32")]
config_dict = {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices]
}
else:
raise HTTPException(status_code=400, detail="Invalid configuration type. Must be 'p25' or 'nbfm'.")
with open('/configs/active.cfg.json', 'w') as f:
json.dump(del_none_in_dict(config_dict), f, indent=2)
return {"message": f"Config exported to '/configs/active.cfg.json'"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def del_none_in_dict(d):
"""
Delete keys with the value ``None`` in a dictionary, recursively.
This alters the input so you may wish to ``copy`` the dict first.
"""
for key, value in list(d.items()):
print(f"Key: '{key}'\nValue: '{value}'")
if value is None:
del d[key]
elif isinstance(value, dict):
del_none_in_dict(value)
elif isinstance(value, list):
for iterative_value in value:
del_none_in_dict(iterative_value)
return d # For convenience