Implement Metadata Watcher #1

Merged
logan merged 9 commits from metadata-watcher into main 2025-12-29 19:04:07 -05:00
6 changed files with 265 additions and 54 deletions

5
.env.example Normal file
View File

@@ -0,0 +1,5 @@
NODE_ID=
MQTT_BROKER=
ICECAST_SERVER=
NODE_LAT=
NODE_LONG=

2
.gitignore vendored
View File

@@ -2,4 +2,4 @@
*.log *.log
*.db *.db
*.conf *.conf
config/* configs/*

View File

@@ -2,9 +2,11 @@ import csv
import json import json
import os import os
import shutil import shutil
from models.models import TalkgroupTag from pathlib import Path
from models.models import TalkgroupTag, IcecastConfig
from typing import List, Dict from typing import List, Dict
from internal.logger import create_logger from internal.logger import create_logger
from internal.liquidsoap_config_utils import generate_liquid_script
LOGGER = create_logger(__name__) LOGGER = create_logger(__name__)
@@ -28,8 +30,8 @@ def scan_local_library() -> List[Dict]:
# Use trunking sysname or filename as the identifier # Use trunking sysname or filename as the identifier
sys_name = data.get("trunking", {}).get("sysname", filename.replace(".json", "")) sys_name = data.get("trunking", {}).get("sysname", filename.replace(".json", ""))
library.append({ library.append({
"name": sys_name, "system_name": sys_name,
"system_name": filename, "filename": filename,
"mode": "P25" if "trunking" in data else "NBFM" "mode": "P25" if "trunking" in data else "NBFM"
}) })
except Exception as e: except Exception as e:
@@ -44,16 +46,48 @@ def activate_config_from_library(system_name: str) -> bool:
if not system_name.endswith(".json"): if not system_name.endswith(".json"):
system_name += ".json" system_name += ".json"
src = os.path.join(CONFIG_DIR, system_name) config_path = Path(CONFIG_DIR)
dst = os.path.join(CONFIG_DIR, "active.cfg.json") src = config_path / system_name
dst = config_path / "active.cfg.json"
if not os.path.exists(src): if not src.exists():
LOGGER.error(f"Source config {system_name} not found in library.") LOGGER.error(f"Source config {system_name} not found in library.")
return False return False
try: try:
shutil.copy2(src, dst) shutil.copy2(src, dst)
LOGGER.info(f"Activated config: {system_name}") LOGGER.info(f"Activated config: {system_name}")
# Copy sidecar files (tags/whitelist) if they exist
src_tags = src.with_suffix(".tags.tsv")
if src_tags.exists():
shutil.copy2(src_tags, config_path / "active.cfg.tags.tsv")
src_whitelist = src.with_suffix(".whitelist.tsv")
if src_whitelist.exists():
shutil.copy2(src_whitelist, config_path / "active.cfg.whitelist.tsv")
# Generate Liquidsoap Script by reading the activated config
with open(dst, 'r') as f:
data = json.load(f)
if "trunking" in data and "metadata" in data:
streams = data.get("metadata", {}).get("streams", [])
if streams:
stream = streams[0]
address = stream.get("icecastServerAddress", "127.0.0.1:8000")
host, port = address.split(":") if ":" in address else (address, 8000)
ice_config = IcecastConfig(
icecast_host=host,
icecast_port=int(port),
icecast_mountpoint=stream.get("icecastMountpoint", "/stream"),
icecast_password=stream.get("icecastPass", "hackme"),
icecast_description="OP25 Stream",
icecast_genre="Scanner"
)
generate_liquid_script(ice_config)
return True return True
except Exception as e: except Exception as e:
LOGGER.error(f"Failed to copy config: {e}") LOGGER.error(f"Failed to copy config: {e}")
@@ -88,14 +122,16 @@ def get_current_active_config() -> Dict:
return {} return {}
return {} return {}
def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag], prefix: str = "active.cfg") -> None:
with open(os.path.join(CONFIG_DIR, "active.cfg.tags.tsv"), 'w', newline='', encoding='utf-8') as file: filename = f"{prefix}.tags.tsv"
with open(os.path.join(CONFIG_DIR, filename), 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n') writer = csv.writer(file, delimiter='\t', lineterminator='\n')
for tag in talkgroup_tags: for tag in talkgroup_tags:
writer.writerow([tag.tagDec, tag.talkgroup]) writer.writerow([tag.tagDec, tag.talkgroup])
def save_whitelist(talkgroup_tags: List[int]) -> None: def save_whitelist(talkgroup_tags: List[int], prefix: str = "active.cfg") -> None:
with open(os.path.join(CONFIG_DIR, "active.cfg.whitelist.tsv"), 'w', newline='', encoding='utf-8') as file: filename = f"{prefix}.whitelist.tsv"
with open(os.path.join(CONFIG_DIR, filename), 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file, delimiter='\t', lineterminator='\n') writer = csv.writer(file, delimiter='\t', lineterminator='\n')
for tag in talkgroup_tags: for tag in talkgroup_tags:
writer.writerow([tag]) writer.writerow([tag])

View File

@@ -183,10 +183,107 @@ async def mqtt_lifecycle_manager():
lwt_payload = json.dumps({"status": "offline", "reason": "unexpected_disconnect"}) lwt_payload = json.dumps({"status": "offline", "reason": "unexpected_disconnect"})
client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True) client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True)
async def metadata_watcher():
"""
Polls OP25 HTTP terminal for metadata and publishes events to MQTT.
Corrected to use the POST-based command API found in the HAR capture.
"""
last_tgid = 0
last_metadata = {}
potential_end_time = None
DEBOUNCE_SECONDS = 2.5
OP25_DATA_URL = "http://127.0.0.1:8081/"
# This is the specific payload the OP25 web interface uses [cite: 45562, 45563]
COMMAND_PAYLOAD = [{"command": "update", "arg1": 0, "arg2": 0}]
while True:
if not MQTT_CONNECTED:
await asyncio.sleep(1)
continue
try:
# Run blocking POST request in executor
loop = asyncio.get_running_loop()
response = await loop.run_in_executor(
None,
lambda: requests.post(OP25_DATA_URL, json=COMMAND_PAYLOAD, timeout=0.5)
)
if response.status_code == 200:
data = response.json()
LOGGER.debug(f"Response from OP25 API: {data}")
current_tgid = 0
current_meta = {}
# The response is an array of update objects
for item in data:
if item.get("json_type") == "channel_update":
# The terminal provides channel info keyed by channel index (e.g., "0")
# We look for the first channel that has an active TGID
for key in item:
if key.isdigit():
ch = item[key]
t = ch.get("tgid")
# OP25 returns null or 0 when no talkgroup is active
if t and int(t) > 0:
current_tgid = int(t)
current_meta = {
"tgid": str(t),
"rid": str(ch.get("srcaddr", "")).strip(),
"alpha_tag": str(ch.get("tag", "")).strip(),
"frequency": str(ch.get("freq", 0)),
"sysname": str(ch.get("system", "")).strip()
}
break
if current_tgid: break
now = datetime.now()
# Logic for handling call start/end events
if current_tgid != 0:
potential_end_time = None
if current_tgid != last_tgid:
if last_tgid != 0:
LOGGER.debug(f"Switching TGID: {last_tgid} -> {current_tgid}")
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
LOGGER.debug(f"Call Start: TGID {current_tgid} ({current_meta.get('alpha_tag')})")
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_start", "metadata": current_meta}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
last_tgid = current_tgid
last_metadata = current_meta
elif last_tgid != 0:
if potential_end_time is None:
LOGGER.debug(f"Signal lost for TGID {last_tgid}. Starting debounce.")
potential_end_time = now
elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS:
LOGGER.debug(f"Call End (Debounce expired): TGID {last_tgid}")
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata}
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
last_tgid = 0
last_metadata = {}
potential_end_time = None
else:
LOGGER.debug(f"OP25 API returned status: {response.status_code}")
except Exception as e:
LOGGER.warning(f"Metadata watcher error: {e}")
await asyncio.sleep(0.25)
try: try:
client.connect(MQTT_BROKER, 1883, 60) client.connect(MQTT_BROKER, 1883, 60)
client.loop_start() # Run network loop in background thread client.loop_start() # Run network loop in background thread
# Start the metadata watcher task
watcher_task = asyncio.create_task(metadata_watcher())
# --- Main Heartbeat Loop --- # --- Main Heartbeat Loop ---
while True: while True:
if MQTT_CONNECTED: if MQTT_CONNECTED:
@@ -194,10 +291,20 @@ async def mqtt_lifecycle_manager():
# Pulse every 30 seconds # Pulse every 30 seconds
# Only wait 30 sec if the HB sent. This way we don't stall a check-in # Only wait 30 sec if the HB sent. This way we don't stall a check-in
await asyncio.sleep(30) await asyncio.sleep(30)
else:
await asyncio.sleep(5)
except asyncio.CancelledError: except asyncio.CancelledError:
LOGGER.info("Stopping MQTT Loop...") LOGGER.info("Stopping MQTT Loop...")
finally: finally:
# Cancel watcher
if 'watcher_task' in locals():
watcher_task.cancel()
try:
await watcher_task
except asyncio.CancelledError:
pass
# Graceful Shutdown: Explicitly tell C2 we are leaving # Graceful Shutdown: Explicitly tell C2 we are leaving
if MQTT_CONNECTED: if MQTT_CONNECTED:
shutdown_payload = json.dumps({"status": "offline", "reason": "clean_shutdown"}) shutdown_payload = json.dumps({"status": "offline", "reason": "clean_shutdown"})

View File

@@ -61,6 +61,77 @@ async def start_op25_logic():
return False return False
return False return False
def build_op25_config(generator: ConfigGenerator) -> dict:
if generator.type == DecodeMode.P25:
channels = [ChannelConfig(
name=generator.systemName,
trunking_sysname=generator.systemName,
enable_analog="off",
demod_type="cqpsk",
cqpsk_tracking=True,
filter_type="rc",
meta_stream_name="stream_0"
)]
devices = [DeviceConfig()]
trunking = TrunkingConfig(
module="tk_p25.py",
chans=[TrunkingChannelConfig(
sysname=generator.systemName,
control_channel_list=','.join(generator.channels),
tagsFile="/configs/active.cfg.tags.tsv",
whitelist="/configs/active.cfg.whitelist.tsv"
)]
)
metadata = MetadataConfig(
streams=[
MetadataStreamConfig(
stream_name="stream_0",
icecastServerAddress = f"{generator.icecastConfig.icecast_host}:{generator.icecastConfig.icecast_port}",
icecastMountpoint = generator.icecastConfig.icecast_mountpoint,
icecastPass = generator.icecastConfig.icecast_password
)
]
)
terminal = TerminalConfig()
return {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices],
"trunking": trunking.dict(),
"metadata": metadata.dict(),
"terminal": terminal.dict()
}
elif generator.type == DecodeMode.ANALOG:
analog_config = generator.config
channels = [ChannelConfig(
channelName=analog_config.systemName,
enableAnalog="on",
demodType="fsk4",
frequency=analog_config.frequency,
filterType="widepulse",
nbfmSquelch=analog_config.nbfmSquelch
)]
devices = [DeviceConfig(gain="LNA:32")]
return {
"channels": [channel.dict() for channel in channels],
"devices": [device.dict() for device in devices]
}
else:
raise HTTPException(status_code=400, detail="Invalid decode mode")
def save_library_sidecars(system_name: str, generator: ConfigGenerator):
if generator.type == DecodeMode.P25:
prefix = system_name
if prefix.endswith(".json"):
prefix = prefix[:-5]
save_talkgroup_tags(generator.tags, prefix)
save_whitelist(generator.whitelist, prefix)
def create_op25_router(): def create_op25_router():
router = APIRouter() router = APIRouter()
@@ -93,46 +164,31 @@ def create_op25_router():
active.cfg.json, and optionally restarts the radio. active.cfg.json, and optionally restarts the radio.
""" """
try: try:
if generator.type == DecodeMode.P25: # 1. Build the configuration dictionary
# 1. Handle sidecar files (Tags/Whitelists) config_dict = build_op25_config(generator)
if generator.config.talkgroupTags:
save_talkgroup_tags(generator.config.talkgroupTags)
if generator.config.whitelist:
save_whitelist(generator.config.whitelist)
# 2. Build the main OP25 dictionary structure
config_dict = {
"channels": [c.dict() for c in generator.config.channels],
"devices": [d.dict() for d in generator.config.devices],
"trunking": generator.config.trunking.dict(),
"metadata": generator.config.metadata.dict(),
"terminal": generator.config.terminal.dict()
}
elif generator.type == DecodeMode.ANALOG:
# Simple Analog NBFM Setup for quick testing
channels = [ChannelConfig(
channelName=generator.config.systemName,
enableAnalog="on",
frequency=generator.config.frequency,
demodType="fsk4",
filterType="widepulse"
)]
config_dict = {
"channels": [c.dict() for c in channels],
"devices": [{"gain": "LNA:32"}] # Default gain for analog test
}
else:
raise HTTPException(status_code=400, detail="Invalid decode mode")
# 3. Clean 'None' values to prevent OP25 parsing errors and save
final_json = del_none_in_dict(config_dict) final_json = del_none_in_dict(config_dict)
if save_to_library_name:
save_config_to_library(save_to_library_name, final_json)
with open('/configs/active.cfg.json', 'w') as f: # 2. Handle Storage and Activation
json.dump(final_json, f, indent=2) if save_to_library_name:
# Save to library
save_config_to_library(save_to_library_name, final_json)
save_library_sidecars(save_to_library_name, generator)
# Activate from library (Copies json + sidecars)
if not activate_config_from_library(save_to_library_name):
raise HTTPException(status_code=500, detail="Failed to activate saved configuration")
else:
# Save directly to active
with open('/configs/active.cfg.json', 'w') as f:
json.dump(final_json, f, indent=2)
if generator.type == DecodeMode.P25:
save_talkgroup_tags(generator.tags)
save_whitelist(generator.whitelist)
# 3. Generate Liquidsoap Script (Always required for active P25 session)
if generator.type == DecodeMode.P25:
generate_liquid_script(generator.icecastConfig)
LOGGER.info("Saved new configuration to active.cfg.json") LOGGER.info("Saved new configuration to active.cfg.json")
@@ -162,13 +218,19 @@ def create_op25_router():
raise HTTPException(status_code=404, detail=f"Config '{system_name}' not found in library volume") raise HTTPException(status_code=404, detail=f"Config '{system_name}' not found in library volume")
@router.post("/save_to_library") @router.post("/save_to_library")
async def save_to_library(system_name: str, config: dict): async def save_to_library(system_name: str, config: ConfigGenerator):
""" """
Directly saves a JSON configuration to the library. Directly saves a JSON configuration to the library.
""" """
if save_config_to_library(system_name, config): try:
return {"status": f"Config saved as {system_name}"} config_dict = build_op25_config(config)
raise HTTPException(status_code=500, detail="Failed to save configuration") final_json = del_none_in_dict(config_dict)
if save_config_to_library(system_name, final_json):
save_library_sidecars(system_name, config)
return {"status": f"Config saved as {system_name}"}
raise HTTPException(status_code=500, detail="Failed to save configuration")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/library") @router.get("/library")
async def get_library(): async def get_library():

View File

@@ -7,10 +7,11 @@ services:
restart: unless-stopped restart: unless-stopped
ports: ports:
- 8001:8001 - 8001:8001
- 8081:8081
devices: devices:
- "/dev/bus/usb:/dev/bus/usb" - "/dev/bus/usb:/dev/bus/usb"
volumes: volumes:
- ./config:/app/config - ./configs:/configs
- ./op25_logs:/tmp/op25 - ./op25_logs:/tmp/op25
env_file: env_file:
- .env - .env