19 Commits

Author SHA1 Message Date
Logan Cusano
48beb79922 Placement error, actual last attempt
Some checks failed
Python Application Tests / build (3.13) (pull_request) Failing after 8s
2025-12-29 20:11:06 -05:00
Logan Cusano
98727615a3 Last fix attempt
Some checks failed
Python Application Tests / build (3.13) (pull_request) Failing after 8s
2025-12-29 20:09:48 -05:00
Logan Cusano
706f5a0e20 last attempt
Some checks failed
Python Application Tests / build (3.13) (pull_request) Failing after 8s
2025-12-29 19:58:16 -05:00
Logan Cusano
1be65c226f Update test with mock models
Some checks failed
Python Application Tests / build (3.13) (pull_request) Failing after 8s
2025-12-29 19:51:56 -05:00
Logan Cusano
313da3684d undo mistake
Some checks failed
Python Application Tests / build (3.13) (pull_request) Failing after 7s
2025-12-29 19:46:59 -05:00
Logan Cusano
80f5eb3f50 Fix test path
Some checks failed
Python Application Tests / build (3.13) (pull_request) Failing after 8s
2025-12-29 19:43:11 -05:00
Logan Cusano
497cbccc80 init testing
Some checks failed
Python Application Tests / build (3.13) (pull_request) Failing after 53s
2025-12-29 19:18:13 -05:00
de143a67fe Merge pull request 'Implement Metadata Watcher' (#1) from metadata-watcher into main
All checks were successful
release-tag / release-image (push) Successful in 1h26m24s
Reviewed-on: #1
2025-12-29 19:04:07 -05:00
Logan Cusano
ee9ce0e140 Add the radio ID to the metadata payload to track who is talking, not just what system 2025-12-29 19:02:51 -05:00
Logan Cusano
ca984be293 Implement debug logging into metadata watcher 2025-12-29 15:48:45 -05:00
Logan Cusano
b8ee991192 Update port in docker compose and update metadata watcher function to use correct OP@5 endpoint 2025-12-29 15:23:18 -05:00
Logan Cusano
0a6b565651 Fix bug in op25 config where it would not create liquidsoap if saved config was loaded 2025-12-29 15:06:48 -05:00
Logan Cusano
269ce033eb Updated op25 config functions 2025-12-29 14:09:53 -05:00
Logan Cusano
c481db6702 Update gitignore for configs 2025-12-29 14:09:26 -05:00
Logan Cusano
e740b46bfe Add example env file 2025-12-29 13:52:23 -05:00
Logan Cusano
bae50463a7 Merge remote-tracking branch 'origin/main' into metadata-watcher 2025-12-29 03:42:07 -05:00
Logan Cusano
6c2054a21e Update package name to repo owner
All checks were successful
release-tag / release-image (push) Successful in 1h25m24s
2025-12-29 03:11:44 -05:00
Logan Cusano
8c275111e2 Update secrets
Some checks failed
release-tag / release-image (push) Failing after 24s
2025-12-29 00:48:20 -05:00
Logan Cusano
15f91819e4 Add a build
Some checks failed
release-tag / release-image (push) Failing after 50s
2025-12-29 00:45:27 -05:00
9 changed files with 611 additions and 85 deletions

5
.env.example Normal file
View File

@@ -0,0 +1,5 @@
NODE_ID=
MQTT_BROKER=
ICECAST_SERVER=
NODE_LAT=
NODE_LONG=

View File

@@ -0,0 +1,57 @@
name: release-tag
on:
push:
branches:
- main
jobs:
release-image:
runs-on: ubuntu-latest
env:
DOCKER_LATEST: stable
CONTAINER_NAME: drb-edge-node
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker BuildX
uses: docker/setup-buildx-action@v3
with: # replace it with your local IP
config-inline: |
[registry."git.vpn.cusano.net"]
http = false
insecure = false
- name: Login to DockerHub
uses: docker/login-action@v3
with:
registry: git.vpn.cusano.net # replace it with your local IP
username: ${{ secrets.GIT_REPO_USERNAME }}
password: ${{ secrets.BUILD_TOKEN }}
- name: Get Meta
id: meta
run: |
echo REPO_NAME=$(echo ${GITHUB_REPOSITORY} | awk -F"/" '{print $2}') >> $GITHUB_OUTPUT
echo REPO_VERSION=$(git describe --tags --always | sed 's/^v//') >> $GITHUB_OUTPUT
- name: Validate build configuration
uses: docker/build-push-action@v6
with:
call: check
- name: Build and push
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile
platforms: |
linux/arm64
push: true
tags: | # replace it with your local IP and tags
git.vpn.cusano.net/${{ gitea.repository_owner }}/${{ steps.meta.outputs.REPO_NAME }}/${{ env.CONTAINER_NAME }}:${{ steps.meta.outputs.REPO_VERSION }}
git.vpn.cusano.net/${{ gitea.repository_owner }}/${{ steps.meta.outputs.REPO_NAME }}/${{ env.CONTAINER_NAME }}:${{ env.DOCKER_LATEST }}

View File

@@ -0,0 +1,37 @@
name: Python Application Tests
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "*" ]
jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.13"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install Dependencies
run: |
python -m pip install --upgrade pip
# Install test dependencies
pip install pytest pytest-asyncio httpx
# Install application dependencies (assuming you have a requirements.txt)
# If you don't have one, create it with `pip freeze > requirements.txt`
# For now, we'll install the dependencies we know are needed from context
pip install fastapi "uvicorn[standard]" paho-mqtt requests
- name: Test with pytest
run: |
pytest

2
.gitignore vendored
View File

@@ -2,4 +2,4 @@
*.log *.log
*.db *.db
*.conf *.conf
config/* configs/*

View File

@@ -2,9 +2,11 @@ import csv
import json import json
import os import os
import shutil import shutil
from models.models import TalkgroupTag from pathlib import Path
from models.models import TalkgroupTag, IcecastConfig
from typing import List, Dict from typing import List, Dict
from internal.logger import create_logger from internal.logger import create_logger
from internal.liquidsoap_config_utils import generate_liquid_script
LOGGER = create_logger(__name__) LOGGER = create_logger(__name__)
@@ -28,8 +30,8 @@ def scan_local_library() -> List[Dict]:
# Use trunking sysname or filename as the identifier # Use trunking sysname or filename as the identifier
sys_name = data.get("trunking", {}).get("sysname", filename.replace(".json", "")) sys_name = data.get("trunking", {}).get("sysname", filename.replace(".json", ""))
library.append({ library.append({
"name": sys_name, "system_name": sys_name,
"system_name": filename, "filename": filename,
"mode": "P25" if "trunking" in data else "NBFM" "mode": "P25" if "trunking" in data else "NBFM"
}) })
except Exception as e: except Exception as e:
@@ -44,16 +46,48 @@ def activate_config_from_library(system_name: str) -> bool:
if not system_name.endswith(".json"): if not system_name.endswith(".json"):
system_name += ".json" system_name += ".json"
src = os.path.join(CONFIG_DIR, system_name) config_path = Path(CONFIG_DIR)
dst = os.path.join(CONFIG_DIR, "active.cfg.json") src = config_path / system_name
dst = config_path / "active.cfg.json"
if not os.path.exists(src): if not src.exists():
LOGGER.error(f"Source config {system_name} not found in library.") LOGGER.error(f"Source config {system_name} not found in library.")
return False return False
try: try:
shutil.copy2(src, dst) shutil.copy2(src, dst)
LOGGER.info(f"Activated config: {system_name}") LOGGER.info(f"Activated config: {system_name}")
# Copy sidecar files (tags/whitelist) if they exist
src_tags = src.with_suffix(".tags.tsv")
if src_tags.exists():
shutil.copy2(src_tags, config_path / "active.cfg.tags.tsv")
src_whitelist = src.with_suffix(".whitelist.tsv")
if src_whitelist.exists():
shutil.copy2(src_whitelist, config_path / "active.cfg.whitelist.tsv")
# Generate Liquidsoap Script by reading the activated config
with open(dst, 'r') as f:
data = json.load(f)
if "trunking" in data and "metadata" in data:
streams = data.get("metadata", {}).get("streams", [])
if streams:
stream = streams[0]
address = stream.get("icecastServerAddress", "127.0.0.1:8000")
host, port = address.split(":") if ":" in address else (address, 8000)
ice_config = IcecastConfig(
icecast_host=host,
icecast_port=int(port),
icecast_mountpoint=stream.get("icecastMountpoint", "/stream"),
icecast_password=stream.get("icecastPass", "hackme"),
icecast_description="OP25 Stream",
icecast_genre="Scanner"
)
generate_liquid_script(ice_config)
return True return True
except Exception as e: except Exception as e:
LOGGER.error(f"Failed to copy config: {e}") LOGGER.error(f"Failed to copy config: {e}")
@@ -88,14 +122,16 @@ def get_current_active_config() -> Dict:
return {} return {}
return {} return {}
def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag], prefix: str = "active.cfg") -> None:
with open(os.path.join(CONFIG_DIR, "active.cfg.tags.tsv"), 'w', newline='', encoding='utf-8') as file: filename = f"{prefix}.tags.tsv"
with open(os.path.join(CONFIG_DIR, filename), 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n') writer = csv.writer(file, delimiter='\t', lineterminator='\n')
for tag in talkgroup_tags: for tag in talkgroup_tags:
writer.writerow([tag.tagDec, tag.talkgroup]) writer.writerow([tag.tagDec, tag.talkgroup])
def save_whitelist(talkgroup_tags: List[int]) -> None: def save_whitelist(talkgroup_tags: List[int], prefix: str = "active.cfg") -> None:
with open(os.path.join(CONFIG_DIR, "active.cfg.whitelist.tsv"), 'w', newline='', encoding='utf-8') as file: filename = f"{prefix}.whitelist.tsv"
with open(os.path.join(CONFIG_DIR, filename), 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n') writer = csv.writer(file, delimiter='\t', lineterminator='\n')
for tag in talkgroup_tags: for tag in talkgroup_tags:
writer.writerow([tag]) writer.writerow([tag])

View File

@@ -186,12 +186,16 @@ async def mqtt_lifecycle_manager():
async def metadata_watcher(): async def metadata_watcher():
""" """
Polls OP25 HTTP terminal for metadata and publishes events to MQTT. Polls OP25 HTTP terminal for metadata and publishes events to MQTT.
Corrected to use the POST-based command API found in the HAR capture.
""" """
last_tgid = 0 last_tgid = 0
last_metadata = {} last_metadata = {}
potential_end_time = None potential_end_time = None
DEBOUNCE_SECONDS = 2.5 DEBOUNCE_SECONDS = 2.5
OP25_DATA_URL = "http://127.0.0.1:8081/data.json" OP25_DATA_URL = "http://127.0.0.1:8081/"
# This is the specific payload the OP25 web interface uses [cite: 45562, 45563]
COMMAND_PAYLOAD = [{"command": "update", "arg1": 0, "arg2": 0}]
while True: while True:
if not MQTT_CONNECTED: if not MQTT_CONNECTED:
@@ -199,52 +203,56 @@ async def mqtt_lifecycle_manager():
continue continue
try: try:
# Run blocking request in executor to avoid blocking the asyncio loop # Run blocking POST request in executor
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
response = await loop.run_in_executor(None, lambda: requests.get(OP25_DATA_URL, timeout=0.5)) response = await loop.run_in_executor(
None,
lambda: requests.post(OP25_DATA_URL, json=COMMAND_PAYLOAD, timeout=0.5)
)
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
LOGGER.debug(f"Response from OP25 API: {data}")
current_tgid = 0 current_tgid = 0
current_meta = {} current_meta = {}
# Handle multi_rx list or single dict structure # The response is an array of update objects
if isinstance(data, list): for item in data:
for ch in data: if item.get("json_type") == "channel_update":
t = ch.get("tgid", 0) # The terminal provides channel info keyed by channel index (e.g., "0")
# We look for the first channel that has an active TGID
for key in item:
if key.isdigit():
ch = item[key]
t = ch.get("tgid")
# OP25 returns null or 0 when no talkgroup is active
if t and int(t) > 0: if t and int(t) > 0:
current_tgid = int(t) current_tgid = int(t)
current_meta = { current_meta = {
"tgid": str(t), "tgid": str(t),
"rid": str(ch.get("srcaddr", "")).strip(),
"alpha_tag": str(ch.get("tag", "")).strip(), "alpha_tag": str(ch.get("tag", "")).strip(),
"frequency": str(ch.get("freq", 0)), "frequency": str(ch.get("freq", 0)),
"sysname": str(ch.get("system", "")).strip() "sysname": str(ch.get("system", "")).strip()
} }
break break
elif isinstance(data, dict): if current_tgid: break
t = data.get("tgid", 0)
if t and int(t) > 0:
current_tgid = int(t)
current_meta = {
"tgid": str(t),
"alpha_tag": str(data.get("tag", "")).strip(),
"frequency": str(data.get("freq", 0)),
"sysname": str(data.get("system", "")).strip()
}
now = datetime.now() now = datetime.now()
# Logic for handling call start/end events
if current_tgid != 0: if current_tgid != 0:
potential_end_time = None # Reset debounce potential_end_time = None
if current_tgid != last_tgid: if current_tgid != last_tgid:
if last_tgid != 0: if last_tgid != 0:
# End previous call immediately if switching channels LOGGER.debug(f"Switching TGID: {last_tgid} -> {current_tgid}")
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata} payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
# Start new call LOGGER.debug(f"Call Start: TGID {current_tgid} ({current_meta.get('alpha_tag')})")
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_start", "metadata": current_meta} payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_start", "metadata": current_meta}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
last_tgid = current_tgid last_tgid = current_tgid
@@ -252,16 +260,20 @@ async def mqtt_lifecycle_manager():
elif last_tgid != 0: elif last_tgid != 0:
if potential_end_time is None: if potential_end_time is None:
LOGGER.debug(f"Signal lost for TGID {last_tgid}. Starting debounce.")
potential_end_time = now potential_end_time = now
elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS: elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS:
LOGGER.debug(f"Call End (Debounce expired): TGID {last_tgid}")
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata} payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
last_tgid = 0 last_tgid = 0
last_metadata = {} last_metadata = {}
potential_end_time = None potential_end_time = None
else:
LOGGER.debug(f"OP25 API returned status: {response.status_code}")
except Exception: except Exception as e:
pass # OP25 might be restarting or busy LOGGER.warning(f"Metadata watcher error: {e}")
await asyncio.sleep(0.25) await asyncio.sleep(0.25)

View File

@@ -61,6 +61,77 @@ async def start_op25_logic():
return False return False
return False return False
def build_op25_config(generator: ConfigGenerator) -> dict:
if generator.type == DecodeMode.P25:
channels = [ChannelConfig(
name=generator.systemName,
trunking_sysname=generator.systemName,
enable_analog="off",
demod_type="cqpsk",
cqpsk_tracking=True,
filter_type="rc",
meta_stream_name="stream_0"
)]
devices = [DeviceConfig()]
trunking = TrunkingConfig(
module="tk_p25.py",
chans=[TrunkingChannelConfig(
sysname=generator.systemName,
control_channel_list=','.join(generator.channels),
tagsFile="/configs/active.cfg.tags.tsv",
whitelist="/configs/active.cfg.whitelist.tsv"
)]
)
metadata = MetadataConfig(
streams=[
MetadataStreamConfig(
stream_name="stream_0",
icecastServerAddress = f"{generator.icecastConfig.icecast_host}:{generator.icecastConfig.icecast_port}",
icecastMountpoint = generator.icecastConfig.icecast_mountpoint,
icecastPass = generator.icecastConfig.icecast_password
)
]
)
terminal = TerminalConfig()
return {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices],
"trunking": trunking.dict(),
"metadata": metadata.dict(),
"terminal": terminal.dict()
}
elif generator.type == DecodeMode.ANALOG:
analog_config = generator.config
channels = [ChannelConfig(
channelName=analog_config.systemName,
enableAnalog="on",
demodType="fsk4",
frequency=analog_config.frequency,
filterType="widepulse",
nbfmSquelch=analog_config.nbfmSquelch
)]
devices = [DeviceConfig(gain="LNA:32")]
return {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices]
}
else:
raise HTTPException(status_code=400, detail="Invalid decode mode")
def save_library_sidecars(system_name: str, generator: ConfigGenerator):
if generator.type == DecodeMode.P25:
prefix = system_name
if prefix.endswith(".json"):
prefix = prefix[:-5]
save_talkgroup_tags(generator.tags, prefix)
save_whitelist(generator.whitelist, prefix)
def create_op25_router(): def create_op25_router():
router = APIRouter() router = APIRouter()
@@ -93,47 +164,32 @@ def create_op25_router():
active.cfg.json, and optionally restarts the radio. active.cfg.json, and optionally restarts the radio.
""" """
try: try:
if generator.type == DecodeMode.P25: # 1. Build the configuration dictionary
# 1. Handle sidecar files (Tags/Whitelists) config_dict = build_op25_config(generator)
if generator.config.talkgroupTags:
save_talkgroup_tags(generator.config.talkgroupTags)
if generator.config.whitelist:
save_whitelist(generator.config.whitelist)
# 2. Build the main OP25 dictionary structure
config_dict = {
"channels": [c.dict() for c in generator.config.channels],
"devices": [d.dict() for d in generator.config.devices],
"trunking": generator.config.trunking.dict(),
"metadata": generator.config.metadata.dict(),
"terminal": generator.config.terminal.dict()
}
elif generator.type == DecodeMode.ANALOG:
# Simple Analog NBFM Setup for quick testing
channels = [ChannelConfig(
channelName=generator.config.systemName,
enableAnalog="on",
frequency=generator.config.frequency,
demodType="fsk4",
filterType="widepulse"
)]
config_dict = {
"channels": [c.dict() for c in channels],
"devices": [{"gain": "LNA:32"}] # Default gain for analog test
}
else:
raise HTTPException(status_code=400, detail="Invalid decode mode")
# 3. Clean 'None' values to prevent OP25 parsing errors and save
final_json = del_none_in_dict(config_dict) final_json = del_none_in_dict(config_dict)
# 2. Handle Storage and Activation
if save_to_library_name: if save_to_library_name:
# Save to library
save_config_to_library(save_to_library_name, final_json) save_config_to_library(save_to_library_name, final_json)
save_library_sidecars(save_to_library_name, generator)
# Activate from library (Copies json + sidecars)
if not activate_config_from_library(save_to_library_name):
raise HTTPException(status_code=500, detail="Failed to activate saved configuration")
else:
# Save directly to active
with open('/configs/active.cfg.json', 'w') as f: with open('/configs/active.cfg.json', 'w') as f:
json.dump(final_json, f, indent=2) json.dump(final_json, f, indent=2)
if generator.type == DecodeMode.P25:
save_talkgroup_tags(generator.tags)
save_whitelist(generator.whitelist)
# 3. Generate Liquidsoap Script (Always required for active P25 session)
if generator.type == DecodeMode.P25:
generate_liquid_script(generator.icecastConfig)
LOGGER.info("Saved new configuration to active.cfg.json") LOGGER.info("Saved new configuration to active.cfg.json")
# 4. Handle Lifecycle # 4. Handle Lifecycle
@@ -162,13 +218,19 @@ def create_op25_router():
raise HTTPException(status_code=404, detail=f"Config '{system_name}' not found in library volume") raise HTTPException(status_code=404, detail=f"Config '{system_name}' not found in library volume")
@router.post("/save_to_library") @router.post("/save_to_library")
async def save_to_library(system_name: str, config: dict): async def save_to_library(system_name: str, config: ConfigGenerator):
""" """
Directly saves a JSON configuration to the library. Directly saves a JSON configuration to the library.
""" """
if save_config_to_library(system_name, config): try:
config_dict = build_op25_config(config)
final_json = del_none_in_dict(config_dict)
if save_config_to_library(system_name, final_json):
save_library_sidecars(system_name, config)
return {"status": f"Config saved as {system_name}"} return {"status": f"Config saved as {system_name}"}
raise HTTPException(status_code=500, detail="Failed to save configuration") raise HTTPException(status_code=500, detail="Failed to save configuration")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/library") @router.get("/library")
async def get_library(): async def get_library():

View File

@@ -7,10 +7,11 @@ services:
restart: unless-stopped restart: unless-stopped
ports: ports:
- 8001:8001 - 8001:8001
- 8081:8081
devices: devices:
- "/dev/bus/usb:/dev/bus/usb" - "/dev/bus/usb:/dev/bus/usb"
volumes: volumes:
- ./config:/app/config - ./configs:/configs
- ./op25_logs:/tmp/op25 - ./op25_logs:/tmp/op25
env_file: env_file:
- .env - .env

View File

@@ -0,0 +1,316 @@
import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch, MagicMock, mock_open, ANY
import json
import os
import types
from typing import List, Optional
from pydantic import BaseModel
# The router is included in the main app, so we test through it.
# We need to adjust the python path for imports to work correctly
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'app')))
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# --- MOCK MODELS ---
# The actual models.models file has a NameError (IcecastConfig used before definition).
# Since we cannot edit the source code, we mock the module here to allow tests to run.
mock_models = types.ModuleType("models.models")
class MockTerminalConfig(BaseModel):
pass
class MockTalkgroupTag(BaseModel):
tagDec: int
tagName: str
class MockDecodeMode:
P25 = "P25"
ANALOG = "ANALOG"
class MockIcecastConfig(BaseModel):
icecast_host: str
icecast_port: int
icecast_mountpoint: str
icecast_password: str
class MockAnalogConfig(BaseModel):
systemName: str
frequency: str
nbfmSquelch: int
class MockConfigGenerator(BaseModel):
type: str
systemName: str
channels: Optional[List[str]] = None
tags: Optional[List[MockTalkgroupTag]] = None
whitelist: Optional[str] = None
icecastConfig: Optional[MockIcecastConfig] = None
config: Optional[MockAnalogConfig] = None
class MockChannelConfig(BaseModel):
name: Optional[str] = None
trunking_sysname: Optional[str] = None
enable_analog: Optional[str] = None
demod_type: Optional[str] = None
cqpsk_tracking: Optional[bool] = None
filter_type: Optional[str] = None
meta_stream_name: Optional[str] = None
channelName: Optional[str] = None
enableAnalog: Optional[str] = None
demodType: Optional[str] = None
frequency: Optional[str] = None
filterType: Optional[str] = None
nbfmSquelch: Optional[int] = None
class MockDeviceConfig(BaseModel):
gain: Optional[str] = None
class MockTrunkingChannelConfig(BaseModel):
sysname: str
control_channel_list: str
tagsFile: str
whitelist: str
class MockTrunkingConfig(BaseModel):
module: str
chans: List[MockTrunkingChannelConfig]
class MockMetadataStreamConfig(BaseModel):
stream_name: str
icecastServerAddress: str
icecastMountpoint: str
icecastPass: str
class MockMetadataConfig(BaseModel):
streams: List[MockMetadataStreamConfig]
mock_models.ConfigGenerator = MockConfigGenerator
mock_models.DecodeMode = MockDecodeMode
mock_models.ChannelConfig = MockChannelConfig
mock_models.DeviceConfig = MockDeviceConfig
mock_models.TrunkingConfig = MockTrunkingConfig
mock_models.TrunkingChannelConfig = MockTrunkingChannelConfig
mock_models.TerminalConfig = MockTerminalConfig
mock_models.MetadataConfig = MockMetadataConfig
mock_models.MetadataStreamConfig = MockMetadataStreamConfig
mock_models.IcecastConfig = MockIcecastConfig
mock_models.TalkgroupTag = MockTalkgroupTag
sys.modules["models.models"] = mock_models
sys.modules["models"] = types.ModuleType("models")
sys.modules["models"].models = mock_models
# -------------------
from app.node_main import app
# Use a client to make requests to the app
client = TestClient(app)
# Define a sample P25 config payload for testing
SAMPLE_P25_CONFIG = {
"type": "P25",
"systemName": "TestSystem",
"channels": ["851.12345", "852.67890"],
"tags": [{"tagDec": 101, "tagName": "Group A"}, {"tagDec": 102, "tagName": "Group B"}],
"whitelist": "101",
"icecastConfig": {
"icecast_host": "localhost",
"icecast_port": 8000,
"icecast_mountpoint": "test",
"icecast_password": "hackme"
}
}
@pytest.fixture(autouse=True)
def reset_and_mock_globals(monkeypatch):
"""
Fixture to reset the global op25_process state and mock dependencies
before each test, ensuring test isolation.
"""
# Reset the global process variable in the controller module
monkeypatch.setattr("routers.op25_controller.op25_process", None)
# Mock asyncio.sleep to prevent tests from actually waiting
mock_sleep = MagicMock()
monkeypatch.setattr("asyncio.sleep", mock_sleep)
# Mock os functions related to process groups
monkeypatch.setattr("os.killpg", MagicMock())
monkeypatch.setattr("os.getpgid", MagicMock(return_value=12345))
@patch("routers.op25_controller.subprocess.Popen")
def test_start_op25_success(mock_popen):
"""Test the /start endpoint successfully starts the process."""
mock_process = MagicMock()
mock_process.pid = 12345
mock_popen.return_value = mock_process
response = client.post("/op25/start")
assert response.status_code == 200
assert response.json() == {"status": "OP25 started"}
mock_popen.assert_called_once()
@patch("routers.op25_controller.subprocess.Popen", side_effect=Exception("Popen failed"))
def test_start_op25_failure(mock_popen):
"""Test the /start endpoint when Popen raises an exception."""
response = client.post("/op25/start")
assert response.status_code == 500
assert "Failed to start OP25" in response.json()["detail"]
def test_stop_op25_not_running():
"""Test the /stop endpoint when the process is not running."""
response = client.post("/op25/stop")
assert response.status_code == 200
assert response.json() == {"status": "OP25 was not running"}
@patch("routers.op25_controller.subprocess.Popen")
def test_stop_op25_success(mock_popen, monkeypatch):
"""Test the /stop endpoint successfully stops a running process."""
mock_process = MagicMock()
mock_process.pid = 12345
mock_process.poll.return_value = None # Indicates it's running
monkeypatch.setattr("routers.op25_controller.op25_process", mock_process)
response = client.post("/op25/stop")
assert response.status_code == 200
assert response.json() == {"status": "OP25 stopped"}
os.killpg.assert_called_with(os.getpgid(mock_process.pid), ANY)
def test_get_status_not_running():
"""Test the /status endpoint when the process is not running."""
response = client.get("/op25/status")
assert response.status_code == 200
data = response.json()
assert data["is_running"] is False
assert data["pid"] is None
assert data["active_system"] is None
@patch("routers.op25_controller.get_current_system_from_config", return_value="TestSystem")
@patch("routers.op25_controller.subprocess.Popen")
def test_get_status_running(mock_popen, mock_get_system, monkeypatch):
"""Test the /status endpoint when the process is running."""
mock_process = MagicMock()
mock_process.pid = 12345
mock_process.poll.return_value = None # Running
monkeypatch.setattr("routers.op25_controller.op25_process", mock_process)
response = client.get("/op25/status")
assert response.status_code == 200
data = response.json()
assert data["is_running"] is True
assert data["pid"] == 12345
assert data["active_system"] == "TestSystem"
mock_get_system.assert_called_once()
@patch("builtins.open", new_callable=mock_open)
@patch("routers.op25_controller.json.dump")
@patch("routers.op25_controller.save_talkgroup_tags")
@patch("routers.op25_controller.save_whitelist")
@patch("routers.op25_controller.generate_liquid_script")
@patch("routers.op25_controller.subprocess.Popen")
def test_set_active_config_no_restart(mock_popen, mock_liquid, mock_white, mock_tags, mock_dump, mock_file):
"""Test setting active config without restarting the radio."""
response = client.post("/op25/set_active_config?restart=false", json=SAMPLE_P25_CONFIG)
assert response.status_code == 200
assert response.json() == {"message": "Active configuration updated", "radio_restarted": False}
# Verify config files were written
mock_file.assert_called_with('/configs/active.cfg.json', 'w')
mock_dump.assert_called_once()
mock_tags.assert_called_with([MockTalkgroupTag(**t) for t in SAMPLE_P25_CONFIG["tags"]])
mock_white.assert_called_with(SAMPLE_P25_CONFIG["whitelist"])
mock_liquid.assert_called_with(MockIcecastConfig(**SAMPLE_P25_CONFIG["icecastConfig"]))
# Verify radio was NOT started/stopped
mock_popen.assert_not_called()
os.killpg.assert_not_called()
@patch("routers.op25_controller.activate_config_from_library", return_value=True)
@patch("routers.op25_controller.save_config_to_library")
@patch("routers.op25_controller.save_library_sidecars")
@patch("routers.op25_controller.subprocess.Popen")
def test_set_active_config_with_save_to_library(mock_popen, mock_save_sidecars, mock_save_lib, mock_activate):
"""Test setting active config and saving it to the library."""
library_name = "MyNewSystem"
response = client.post(
f"/op25/set_active_config?restart=true&save_to_library_name={library_name}",
json=SAMPLE_P25_CONFIG
)
assert response.status_code == 200
assert response.json()["radio_restarted"] is True
# Verify it was saved and then activated from the library
mock_save_lib.assert_called_with(library_name, ANY)
mock_save_sidecars.assert_called_with(library_name, ANY)
mock_activate.assert_called_with(library_name)
# Verify radio was restarted
assert mock_popen.call_count == 1
@patch("routers.op25_controller.activate_config_from_library", return_value=True)
@patch("routers.op25_controller.subprocess.Popen")
def test_load_from_library_success(mock_popen, mock_activate):
"""Test loading a configuration from the library."""
system_name = "ExistingSystem"
response = client.post(f"/op25/load_from_library?system_name={system_name}")
assert response.status_code == 200
assert response.json() == {"status": f"Loaded and started library config: {system_name}"}
# Verify activation and restart
mock_activate.assert_called_with(system_name)
assert mock_popen.call_count == 1
@patch("routers.op25_controller.activate_config_from_library", return_value=False)
def test_load_from_library_not_found(mock_activate):
"""Test loading a non-existent configuration from the library."""
system_name = "NotFoundSystem"
response = client.post(f"/op25/load_from_library?system_name={system_name}")
assert response.status_code == 404
assert "not found in library" in response.json()["detail"]
@patch("routers.op25_controller.save_config_to_library", return_value=True)
@patch("routers.op25_controller.save_library_sidecars")
def test_save_to_library(mock_save_sidecars, mock_save_lib):
"""Test saving a configuration directly to the library."""
system_name = "NewLibSystem"
response = client.post(f"/op25/save_to_library?system_name={system_name}", json=SAMPLE_P25_CONFIG)
assert response.status_code == 200
assert response.json() == {"status": f"Config saved as {system_name}"}
mock_save_lib.assert_called_with(system_name, ANY)
mock_save_sidecars.assert_called_with(system_name, ANY)
@patch("routers.op25_controller.scan_local_library", return_value=["System1.json", "System2.json"])
def test_get_library(mock_scan):
"""Test the /library endpoint."""
response = client.get("/op25/library")
assert response.status_code == 200
assert response.json() == ["System1.json", "System2.json"]
mock_scan.assert_called_once()
@patch("routers.op25_controller.build_op25_config", side_effect=Exception("Build failed"))
def test_set_active_config_build_failure(mock_build):
"""Test error handling when config building fails."""
response = client.post("/op25/set_active_config", json=SAMPLE_P25_CONFIG)
assert response.status_code == 500
assert "Configuration error: Build failed" in response.json()["detail"]