From 1c0524050479a633ec4cfcb1bcc48893c09b889c Mon Sep 17 00:00:00 2001 From: Logan Cusano Date: Sat, 22 Feb 2025 02:25:16 -0500 Subject: [PATCH] Improve the config generation - Updated tag and whitelist file generation --- app/op25_controller.py | 67 ++++++++++++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 22 deletions(-) diff --git a/app/op25_controller.py b/app/op25_controller.py index 74f976f..092b877 100644 --- a/app/op25_controller.py +++ b/app/op25_controller.py @@ -5,6 +5,7 @@ import subprocess import os import signal import json +import csv from typing import List, Optional, Union router = APIRouter() @@ -48,20 +49,16 @@ class DecodeMode(str, Enum): DMR = "DMR" ANALOG = "NBFM" -class P25Config(BaseModel): - systemName: str - controlChannels: List[str] - tagsFile: str - whitelistFile: Optional[str] = None - -class NBFMConfig(BaseModel): - systemName: str - frequency: float - nbfmSquelch: Optional[float] = -70 +class TalkgroupTag(BaseModel): + talkgroup: str + tagDec: int class ConfigGenerator(BaseModel): type: DecodeMode - config: Union[P25Config, NBFMConfig] + systemName: str + channels: List[str] + tags: List[TalkgroupTag] + whitelist: List[int] class DemodType(str, Enum): CQPSK = "cqpsk" @@ -141,23 +138,24 @@ class TerminalConfig(BaseModel): async def generate_config(generator: ConfigGenerator): try: if generator.type == DecodeMode.P25: - config_data = generator.config channels = [ChannelConfig( - name=config_data.systemName, - trunking_sysname=config_data.systemName, + name=generator.systemName, + trunking_sysname=generator.systemName, enable_analog="off", demod_type="cqpsk", cqpsk_tracking=True, filter_type="rc" )] devices = [DeviceConfig()] + save_talkgroup_tags(generator.tags) + save_whitelist(generator.whitelist) trunking = TrunkingConfig( module="tk_p25.py", chans=[TrunkingChannelConfig( - sysname=config_data.systemName, - control_channel_list=','.join(config_data.controlChannels), - tagsFile=config_data.tagsFile, - whitelist=config_data.whitelistFile + sysname=generator.systemName, + control_channel_list=','.join(generator.channels), + tagsFile="/configs/active.cfg.tags.tsv", + whitelist="/configs/active.cfg.whitelist.tsv" )] ) @@ -174,14 +172,14 @@ async def generate_config(generator: ConfigGenerator): } elif generator.type == DecodeMode.ANALOG: - config_data = generator.config + generator = generator.config channels = [ChannelConfig( - channelName=config_data.systemName, + channelName=generator.systemName, enableAnalog="on", demodType="fsk4", - frequency=config_data.frequency, + frequency=generator.frequency, filterType="widepulse", - nbfmSquelch=config_data.nbfmSquelch + nbfmSquelch=generator.nbfmSquelch )] devices = [DeviceConfig(gain="LNA:32")] @@ -200,6 +198,31 @@ async def generate_config(generator: ConfigGenerator): except Exception as e: raise HTTPException(status_code=500, detail=str(e)) +def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: + """ + Writes a list of tags to the tags file. + + Args: + talkgroup_tags (List[TalkgroupTag]): The list of TalkgroupTag instances. + """ + with open("/configs/active.cfg.tags.tsv", 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file, delimiter='\t', lineterminator='\n') + # Write rows + for tag in talkgroup_tags: + writer.writerow([tag.talkgroup, tag.tagDec]) + +def save_whitelist(talkgroup_tags: List[int]) -> None: + """ + Writes a list of talkgroups to the whitelists file. + + Args: + talkgroup_tags (List[int]): The list of decimals to whitelist. + """ + with open("/configs/active.cfg.whitelist.tsv", 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file, delimiter='\t', lineterminator='\n') + # Write rows + for tag in talkgroup_tags: + writer.writerow([tag]) def del_none_in_dict(d): """