This commit is contained in:
Logan Cusano
2025-12-28 02:37:50 -05:00
commit 26e90f4584
14 changed files with 893 additions and 0 deletions

View File

@@ -0,0 +1,34 @@
import os
from internal.op25_liq_template import liquidsoap_config_template
from models.models import IcecastConfig
def generate_liquid_script(config: IcecastConfig):
"""
Generates the "*.liq" file that's run by OP25 on startup.
Placeholders in the template must be formatted as ${VARIABLE_NAME}.
Args:
config (dict): A dictionary of key-value pairs for substitution.
Keys should match the variable names in the template (e.g., 'icecast_host').
"""
try:
content = liquidsoap_config_template
# Replace variables
for key, value in config.model_dump().items():
placeholder = f"${{{key}}}"
# Ensure the value is converted to string for replacement
content = content.replace(placeholder, str(value))
print(f" - Replaced placeholder {placeholder}")
# Write the processed content to the output path
output_path = "/configs/op25.liq"
with open(output_path, 'a+') as f:
f.write(content)
print(f"\nSuccessfully wrote processed configuration to: {output_path}")
except FileNotFoundError:
print(f"Error: Template file not found at {template_path}")
except Exception as e:
print(f"An unexpected error occurred: {e}")

55
app/internal/logger.py Normal file
View File

@@ -0,0 +1,55 @@
import logging
from logging.handlers import RotatingFileHandler
def create_logger(name, level=logging.DEBUG, max_bytes=10485760, backup_count=2):
"""
Creates a logger with a console and rotating file handlers for both debug and info log levels.
Args:
name (str): The name for the logger.
level (int): The logging level for the logger. Defaults to logging.DEBUG.
max_bytes (int): Maximum size of the log file in bytes before it gets rotated. Defaults to 10 MB.
backup_count (int): Number of backup files to keep. Defaults to 2.
Returns:
logging.Logger: Configured logger.
"""
# Set the log file paths
debug_log_file = "./client.debug.log"
info_log_file = "./client.log"
# Create a logger
logger = logging.getLogger(name)
logger.setLevel(level)
# Check if the logger already has handlers to avoid duplicate logs
if not logger.hasHandlers():
# Create console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
# Create rotating file handler for debug level
debug_file_handler = RotatingFileHandler(debug_log_file, maxBytes=max_bytes, backupCount=backup_count)
debug_file_handler.setLevel(logging.DEBUG)
# Create rotating file handler for info level
info_file_handler = RotatingFileHandler(info_log_file, maxBytes=max_bytes, backupCount=backup_count)
info_file_handler.setLevel(logging.INFO)
# Create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
debug_file_handler.setFormatter(formatter)
info_file_handler.setFormatter(formatter)
# Add the handlers to the logger
logger.addHandler(console_handler)
logger.addHandler(debug_file_handler)
logger.addHandler(info_file_handler)
return logger
# Example usage:
# logger = create_logger('my_logger')
# logger.debug('This is a debug message')
# logger.info('This is an info message')

View File

@@ -0,0 +1,122 @@
import csv
import json
import os
import shutil
from models.models import TalkgroupTag
from typing import List, Dict
from internal.logger import create_logger
LOGGER = create_logger(__name__)
CONFIG_DIR = "/configs"
def scan_local_library() -> List[Dict]:
"""
Scans the /configs directory for JSON files to build the 'nearby_systems' list.
"""
library = []
if not os.path.exists(CONFIG_DIR):
return library
for filename in os.listdir(CONFIG_DIR):
# We don't want to include the active config or the sidecar files in the library scan
if filename.endswith(".json") and filename != "active.cfg.json":
try:
path = os.path.join(CONFIG_DIR, filename)
with open(path, 'r') as f:
data = json.load(f)
# Use trunking sysname or filename as the identifier
sys_name = data.get("trunking", {}).get("sysname", filename.replace(".json", ""))
library.append({
"name": sys_name,
"system_name": filename,
"mode": "P25" if "trunking" in data else "NBFM"
})
except Exception as e:
LOGGER.error(f"Failed to parse library file {filename}: {e}")
return library
def activate_config_from_library(system_name: str) -> bool:
"""
Copies a config from the library to the active slot.
"""
if not system_name.endswith(".json"):
system_name += ".json"
src = os.path.join(CONFIG_DIR, system_name)
dst = os.path.join(CONFIG_DIR, "active.cfg.json")
if not os.path.exists(src):
LOGGER.error(f"Source config {system_name} not found in library.")
return False
try:
shutil.copy2(src, dst)
LOGGER.info(f"Activated config: {system_name}")
return True
except Exception as e:
LOGGER.error(f"Failed to copy config: {e}")
return False
def save_config_to_library(system_name: str, config: Dict) -> bool:
"""
Saves a configuration dictionary to the local library.
"""
if not system_name.endswith(".json"):
system_name += ".json"
path = os.path.join(CONFIG_DIR, system_name)
try:
with open(path, 'w') as f:
json.dump(config, f, indent=2)
LOGGER.info(f"Saved config to library: {system_name}")
return True
except Exception as e:
LOGGER.error(f"Failed to save config to library: {e}")
return False
def get_current_active_config() -> Dict:
"""Reads the current active.cfg.json if it exists."""
path = os.path.join(CONFIG_DIR, "active.cfg.json")
if os.path.exists(path):
try:
with open(path, 'r') as f:
return json.load(f)
except:
return {}
return {}
def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None:
with open(os.path.join(CONFIG_DIR, "active.cfg.tags.tsv"), 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n')
for tag in talkgroup_tags:
writer.writerow([tag.tagDec, tag.talkgroup])
def save_whitelist(talkgroup_tags: List[int]) -> None:
with open(os.path.join(CONFIG_DIR, "active.cfg.whitelist.tsv"), 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n')
for tag in talkgroup_tags:
writer.writerow([tag])
def del_none_in_dict(d):
for key, value in list(d.items()):
if value is None:
del d[key]
elif isinstance(value, dict):
del_none_in_dict(value)
elif isinstance(value, list):
for iterative_value in value:
if isinstance(iterative_value, dict):
del_none_in_dict(iterative_value)
return d
def get_current_system_from_config() -> str:
data = get_current_active_config()
if not data:
return None
try:
return data.get("trunking", {}).get("sysname", "Unknown System")
except:
return "Unknown System"

View File

@@ -0,0 +1,48 @@
liquidsoap_config_template = """#!/usr/bin/liquidsoap
# Example liquidsoap streaming from op25 to icecast
# (c) 2019-2021 gnorbury@bondcar.com, wllmbecks@gmail.com
#
set("log.stdout", true)
set("log.file", false)
set("log.level", 1)
# Make the native sample rate compatible with op25
set("frame.audio.samplerate", 8000)
set("init.allow_root", true)
# ==========================================================
ICE_HOST = "${icecast_host}"
ICE_PORT = ${icecast_port}
ICE_MOUNT = "${icecast_mountpoint}"
ICE_PASSWORD = "${icecast_password}"
ICE_DESCRIPTION = "${icecast_description}"
ICE_GENRE = "${icecast_genre}"
# ==========================================================
input = mksafe(input.external(buffer=0.25, channels=2, samplerate=8000, restart_on_error=false, "./audio.py -x 2.5 -s"))
# Consider increasing the buffer value on slow systems such as RPi3. e.g. buffer=0.25
# Compression
input = compress(input, attack = 2.0, gain = 0.0, knee = 13.0, ratio = 2.0, release = 12.3, threshold = -18.0)
# Normalization
input = normalize(input, gain_max = 6.0, gain_min = -6.0, target = -16.0, threshold = -65.0)
# ==========================================================
# OUTPUT: Referencing the new variables
# ==========================================================
output.icecast(
%mp3(bitrate=16, samplerate=22050, stereo=false),
description=ICE_DESCRIPTION,
genre=ICE_GENRE,
url="",
fallible=false,
host=ICE_HOST,
port=ICE_PORT,
mount=ICE_MOUNT,
password=ICE_PASSWORD,
mean(input)
)
"""