Improve the config generation

- Updated tag and whitelist file generation
This commit is contained in:
2025-02-22 02:25:16 -05:00
parent fd0231690b
commit 1c05240504

View File

@@ -5,6 +5,7 @@ import subprocess
import os import os
import signal import signal
import json import json
import csv
from typing import List, Optional, Union from typing import List, Optional, Union
router = APIRouter() router = APIRouter()
@@ -48,20 +49,16 @@ class DecodeMode(str, Enum):
DMR = "DMR" DMR = "DMR"
ANALOG = "NBFM" ANALOG = "NBFM"
class P25Config(BaseModel): class TalkgroupTag(BaseModel):
systemName: str talkgroup: str
controlChannels: List[str] tagDec: int
tagsFile: str
whitelistFile: Optional[str] = None
class NBFMConfig(BaseModel):
systemName: str
frequency: float
nbfmSquelch: Optional[float] = -70
class ConfigGenerator(BaseModel): class ConfigGenerator(BaseModel):
type: DecodeMode type: DecodeMode
config: Union[P25Config, NBFMConfig] systemName: str
channels: List[str]
tags: List[TalkgroupTag]
whitelist: List[int]
class DemodType(str, Enum): class DemodType(str, Enum):
CQPSK = "cqpsk" CQPSK = "cqpsk"
@@ -141,23 +138,24 @@ class TerminalConfig(BaseModel):
async def generate_config(generator: ConfigGenerator): async def generate_config(generator: ConfigGenerator):
try: try:
if generator.type == DecodeMode.P25: if generator.type == DecodeMode.P25:
config_data = generator.config
channels = [ChannelConfig( channels = [ChannelConfig(
name=config_data.systemName, name=generator.systemName,
trunking_sysname=config_data.systemName, trunking_sysname=generator.systemName,
enable_analog="off", enable_analog="off",
demod_type="cqpsk", demod_type="cqpsk",
cqpsk_tracking=True, cqpsk_tracking=True,
filter_type="rc" filter_type="rc"
)] )]
devices = [DeviceConfig()] devices = [DeviceConfig()]
save_talkgroup_tags(generator.tags)
save_whitelist(generator.whitelist)
trunking = TrunkingConfig( trunking = TrunkingConfig(
module="tk_p25.py", module="tk_p25.py",
chans=[TrunkingChannelConfig( chans=[TrunkingChannelConfig(
sysname=config_data.systemName, sysname=generator.systemName,
control_channel_list=','.join(config_data.controlChannels), control_channel_list=','.join(generator.channels),
tagsFile=config_data.tagsFile, tagsFile="/configs/active.cfg.tags.tsv",
whitelist=config_data.whitelistFile whitelist="/configs/active.cfg.whitelist.tsv"
)] )]
) )
@@ -174,14 +172,14 @@ async def generate_config(generator: ConfigGenerator):
} }
elif generator.type == DecodeMode.ANALOG: elif generator.type == DecodeMode.ANALOG:
config_data = generator.config generator = generator.config
channels = [ChannelConfig( channels = [ChannelConfig(
channelName=config_data.systemName, channelName=generator.systemName,
enableAnalog="on", enableAnalog="on",
demodType="fsk4", demodType="fsk4",
frequency=config_data.frequency, frequency=generator.frequency,
filterType="widepulse", filterType="widepulse",
nbfmSquelch=config_data.nbfmSquelch nbfmSquelch=generator.nbfmSquelch
)] )]
devices = [DeviceConfig(gain="LNA:32")] devices = [DeviceConfig(gain="LNA:32")]
@@ -200,6 +198,31 @@ async def generate_config(generator: ConfigGenerator):
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None:
"""
Writes a list of tags to the tags file.
Args:
talkgroup_tags (List[TalkgroupTag]): The list of TalkgroupTag instances.
"""
with open("/configs/active.cfg.tags.tsv", 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n')
# Write rows
for tag in talkgroup_tags:
writer.writerow([tag.talkgroup, tag.tagDec])
def save_whitelist(talkgroup_tags: List[int]) -> None:
"""
Writes a list of talkgroups to the whitelists file.
Args:
talkgroup_tags (List[int]): The list of decimals to whitelist.
"""
with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n')
# Write rows
for tag in talkgroup_tags:
writer.writerow([tag])
def del_none_in_dict(d): def del_none_in_dict(d):
""" """