diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..c49a1a7 --- /dev/null +++ b/.env.example @@ -0,0 +1,5 @@ +NODE_ID= +MQTT_BROKER= +ICECAST_SERVER= +NODE_LAT= +NODE_LONG= \ No newline at end of file diff --git a/.gitignore b/.gitignore index 558e6b1..2a65b26 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,4 @@ *.log *.db *.conf -config/* \ No newline at end of file +configs/* \ No newline at end of file diff --git a/app/internal/op25_config_utls.py b/app/internal/op25_config_utls.py index 30d0609..96adfc1 100644 --- a/app/internal/op25_config_utls.py +++ b/app/internal/op25_config_utls.py @@ -2,9 +2,11 @@ import csv import json import os import shutil -from models.models import TalkgroupTag +from pathlib import Path +from models.models import TalkgroupTag, IcecastConfig from typing import List, Dict from internal.logger import create_logger +from internal.liquidsoap_config_utils import generate_liquid_script LOGGER = create_logger(__name__) @@ -28,8 +30,8 @@ def scan_local_library() -> List[Dict]: # Use trunking sysname or filename as the identifier sys_name = data.get("trunking", {}).get("sysname", filename.replace(".json", "")) library.append({ - "name": sys_name, - "system_name": filename, + "system_name": sys_name, + "filename": filename, "mode": "P25" if "trunking" in data else "NBFM" }) except Exception as e: @@ -44,16 +46,48 @@ def activate_config_from_library(system_name: str) -> bool: if not system_name.endswith(".json"): system_name += ".json" - src = os.path.join(CONFIG_DIR, system_name) - dst = os.path.join(CONFIG_DIR, "active.cfg.json") + config_path = Path(CONFIG_DIR) + src = config_path / system_name + dst = config_path / "active.cfg.json" - if not os.path.exists(src): + if not src.exists(): LOGGER.error(f"Source config {system_name} not found in library.") return False try: shutil.copy2(src, dst) LOGGER.info(f"Activated config: {system_name}") + + # Copy sidecar files (tags/whitelist) if they exist + src_tags = src.with_suffix(".tags.tsv") + if src_tags.exists(): + shutil.copy2(src_tags, config_path / "active.cfg.tags.tsv") + + src_whitelist = src.with_suffix(".whitelist.tsv") + if src_whitelist.exists(): + shutil.copy2(src_whitelist, config_path / "active.cfg.whitelist.tsv") + + # Generate Liquidsoap Script by reading the activated config + with open(dst, 'r') as f: + data = json.load(f) + + if "trunking" in data and "metadata" in data: + streams = data.get("metadata", {}).get("streams", []) + if streams: + stream = streams[0] + address = stream.get("icecastServerAddress", "127.0.0.1:8000") + host, port = address.split(":") if ":" in address else (address, 8000) + + ice_config = IcecastConfig( + icecast_host=host, + icecast_port=int(port), + icecast_mountpoint=stream.get("icecastMountpoint", "/stream"), + icecast_password=stream.get("icecastPass", "hackme"), + icecast_description="OP25 Stream", + icecast_genre="Scanner" + ) + generate_liquid_script(ice_config) + return True except Exception as e: LOGGER.error(f"Failed to copy config: {e}") @@ -88,14 +122,16 @@ def get_current_active_config() -> Dict: return {} return {} -def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag]) -> None: - with open(os.path.join(CONFIG_DIR, "active.cfg.tags.tsv"), 'w', newline='', encoding='utf-8') as file: +def save_talkgroup_tags(talkgroup_tags: List[TalkgroupTag], prefix: str = "active.cfg") -> None: + filename = f"{prefix}.tags.tsv" + with open(os.path.join(CONFIG_DIR, filename), 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file, delimiter='\t', lineterminator='\n') for tag in talkgroup_tags: writer.writerow([tag.tagDec, tag.talkgroup]) -def save_whitelist(talkgroup_tags: List[int]) -> None: - with open(os.path.join(CONFIG_DIR, "active.cfg.whitelist.tsv"), 'w', newline='', encoding='utf-8') as file: +def save_whitelist(talkgroup_tags: List[int], prefix: str = "active.cfg") -> None: + filename = f"{prefix}.whitelist.tsv" + with open(os.path.join(CONFIG_DIR, filename), 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file, delimiter='\t', lineterminator='\n') for tag in talkgroup_tags: writer.writerow([tag]) diff --git a/app/node_main.py b/app/node_main.py index d9cc663..f9e2d95 100644 --- a/app/node_main.py +++ b/app/node_main.py @@ -183,10 +183,107 @@ async def mqtt_lifecycle_manager(): lwt_payload = json.dumps({"status": "offline", "reason": "unexpected_disconnect"}) client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True) + async def metadata_watcher(): + """ + Polls OP25 HTTP terminal for metadata and publishes events to MQTT. + Corrected to use the POST-based command API found in the HAR capture. + """ + last_tgid = 0 + last_metadata = {} + potential_end_time = None + DEBOUNCE_SECONDS = 2.5 + OP25_DATA_URL = "http://127.0.0.1:8081/" + + # This is the specific payload the OP25 web interface uses [cite: 45562, 45563] + COMMAND_PAYLOAD = [{"command": "update", "arg1": 0, "arg2": 0}] + + while True: + if not MQTT_CONNECTED: + await asyncio.sleep(1) + continue + + try: + # Run blocking POST request in executor + loop = asyncio.get_running_loop() + response = await loop.run_in_executor( + None, + lambda: requests.post(OP25_DATA_URL, json=COMMAND_PAYLOAD, timeout=0.5) + ) + + if response.status_code == 200: + data = response.json() + LOGGER.debug(f"Response from OP25 API: {data}") + + current_tgid = 0 + current_meta = {} + + # The response is an array of update objects + for item in data: + if item.get("json_type") == "channel_update": + # The terminal provides channel info keyed by channel index (e.g., "0") + # We look for the first channel that has an active TGID + for key in item: + if key.isdigit(): + ch = item[key] + t = ch.get("tgid") + + # OP25 returns null or 0 when no talkgroup is active + if t and int(t) > 0: + current_tgid = int(t) + current_meta = { + "tgid": str(t), + "rid": str(ch.get("srcaddr", "")).strip(), + "alpha_tag": str(ch.get("tag", "")).strip(), + "frequency": str(ch.get("freq", 0)), + "sysname": str(ch.get("system", "")).strip() + } + break + if current_tgid: break + + now = datetime.now() + + # Logic for handling call start/end events + if current_tgid != 0: + potential_end_time = None + + if current_tgid != last_tgid: + if last_tgid != 0: + LOGGER.debug(f"Switching TGID: {last_tgid} -> {current_tgid}") + payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata} + client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) + + LOGGER.debug(f"Call Start: TGID {current_tgid} ({current_meta.get('alpha_tag')})") + payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_start", "metadata": current_meta} + client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) + last_tgid = current_tgid + last_metadata = current_meta + + elif last_tgid != 0: + if potential_end_time is None: + LOGGER.debug(f"Signal lost for TGID {last_tgid}. Starting debounce.") + potential_end_time = now + elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS: + LOGGER.debug(f"Call End (Debounce expired): TGID {last_tgid}") + payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata} + client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0) + last_tgid = 0 + last_metadata = {} + potential_end_time = None + else: + LOGGER.debug(f"OP25 API returned status: {response.status_code}") + + except Exception as e: + LOGGER.warning(f"Metadata watcher error: {e}") + + await asyncio.sleep(0.25) + try: client.connect(MQTT_BROKER, 1883, 60) client.loop_start() # Run network loop in background thread + # Start the metadata watcher task + watcher_task = asyncio.create_task(metadata_watcher()) + # --- Main Heartbeat Loop --- while True: if MQTT_CONNECTED: @@ -194,10 +291,20 @@ async def mqtt_lifecycle_manager(): # Pulse every 30 seconds # Only wait 30 sec if the HB sent. This way we don't stall a check-in await asyncio.sleep(30) + else: + await asyncio.sleep(5) except asyncio.CancelledError: LOGGER.info("Stopping MQTT Loop...") finally: + # Cancel watcher + if 'watcher_task' in locals(): + watcher_task.cancel() + try: + await watcher_task + except asyncio.CancelledError: + pass + # Graceful Shutdown: Explicitly tell C2 we are leaving if MQTT_CONNECTED: shutdown_payload = json.dumps({"status": "offline", "reason": "clean_shutdown"}) diff --git a/app/routers/op25_controller.py b/app/routers/op25_controller.py index 5accdbf..06c2786 100644 --- a/app/routers/op25_controller.py +++ b/app/routers/op25_controller.py @@ -61,6 +61,77 @@ async def start_op25_logic(): return False return False +def build_op25_config(generator: ConfigGenerator) -> dict: + if generator.type == DecodeMode.P25: + channels = [ChannelConfig( + name=generator.systemName, + trunking_sysname=generator.systemName, + enable_analog="off", + demod_type="cqpsk", + cqpsk_tracking=True, + filter_type="rc", + meta_stream_name="stream_0" + )] + devices = [DeviceConfig()] + + trunking = TrunkingConfig( + module="tk_p25.py", + chans=[TrunkingChannelConfig( + sysname=generator.systemName, + control_channel_list=','.join(generator.channels), + tagsFile="/configs/active.cfg.tags.tsv", + whitelist="/configs/active.cfg.whitelist.tsv" + )] + ) + + metadata = MetadataConfig( + streams=[ + MetadataStreamConfig( + stream_name="stream_0", + icecastServerAddress = f"{generator.icecastConfig.icecast_host}:{generator.icecastConfig.icecast_port}", + icecastMountpoint = generator.icecastConfig.icecast_mountpoint, + icecastPass = generator.icecastConfig.icecast_password + ) + ] + ) + + terminal = TerminalConfig() + + return { + "channels": [channel.dict() for channel in channels], + "devices": [device.dict() for device in devices], + "trunking": trunking.dict(), + "metadata": metadata.dict(), + "terminal": terminal.dict() + } + + elif generator.type == DecodeMode.ANALOG: + analog_config = generator.config + channels = [ChannelConfig( + channelName=analog_config.systemName, + enableAnalog="on", + demodType="fsk4", + frequency=analog_config.frequency, + filterType="widepulse", + nbfmSquelch=analog_config.nbfmSquelch + )] + devices = [DeviceConfig(gain="LNA:32")] + + return { + "channels": [channel.dict() for channel in channels], + "devices": [device.dict() for device in devices] + } + else: + raise HTTPException(status_code=400, detail="Invalid decode mode") + +def save_library_sidecars(system_name: str, generator: ConfigGenerator): + if generator.type == DecodeMode.P25: + prefix = system_name + if prefix.endswith(".json"): + prefix = prefix[:-5] + save_talkgroup_tags(generator.tags, prefix) + save_whitelist(generator.whitelist, prefix) + def create_op25_router(): router = APIRouter() @@ -93,46 +164,31 @@ def create_op25_router(): active.cfg.json, and optionally restarts the radio. """ try: - if generator.type == DecodeMode.P25: - # 1. Handle sidecar files (Tags/Whitelists) - if generator.config.talkgroupTags: - save_talkgroup_tags(generator.config.talkgroupTags) - if generator.config.whitelist: - save_whitelist(generator.config.whitelist) - - # 2. Build the main OP25 dictionary structure - config_dict = { - "channels": [c.dict() for c in generator.config.channels], - "devices": [d.dict() for d in generator.config.devices], - "trunking": generator.config.trunking.dict(), - "metadata": generator.config.metadata.dict(), - "terminal": generator.config.terminal.dict() - } - - elif generator.type == DecodeMode.ANALOG: - # Simple Analog NBFM Setup for quick testing - channels = [ChannelConfig( - channelName=generator.config.systemName, - enableAnalog="on", - frequency=generator.config.frequency, - demodType="fsk4", - filterType="widepulse" - )] - config_dict = { - "channels": [c.dict() for c in channels], - "devices": [{"gain": "LNA:32"}] # Default gain for analog test - } - else: - raise HTTPException(status_code=400, detail="Invalid decode mode") - - # 3. Clean 'None' values to prevent OP25 parsing errors and save + # 1. Build the configuration dictionary + config_dict = build_op25_config(generator) final_json = del_none_in_dict(config_dict) - - if save_to_library_name: - save_config_to_library(save_to_library_name, final_json) - with open('/configs/active.cfg.json', 'w') as f: - json.dump(final_json, f, indent=2) + # 2. Handle Storage and Activation + if save_to_library_name: + # Save to library + save_config_to_library(save_to_library_name, final_json) + save_library_sidecars(save_to_library_name, generator) + + # Activate from library (Copies json + sidecars) + if not activate_config_from_library(save_to_library_name): + raise HTTPException(status_code=500, detail="Failed to activate saved configuration") + else: + # Save directly to active + with open('/configs/active.cfg.json', 'w') as f: + json.dump(final_json, f, indent=2) + + if generator.type == DecodeMode.P25: + save_talkgroup_tags(generator.tags) + save_whitelist(generator.whitelist) + + # 3. Generate Liquidsoap Script (Always required for active P25 session) + if generator.type == DecodeMode.P25: + generate_liquid_script(generator.icecastConfig) LOGGER.info("Saved new configuration to active.cfg.json") @@ -162,13 +218,19 @@ def create_op25_router(): raise HTTPException(status_code=404, detail=f"Config '{system_name}' not found in library volume") @router.post("/save_to_library") - async def save_to_library(system_name: str, config: dict): + async def save_to_library(system_name: str, config: ConfigGenerator): """ Directly saves a JSON configuration to the library. """ - if save_config_to_library(system_name, config): - return {"status": f"Config saved as {system_name}"} - raise HTTPException(status_code=500, detail="Failed to save configuration") + try: + config_dict = build_op25_config(config) + final_json = del_none_in_dict(config_dict) + if save_config_to_library(system_name, final_json): + save_library_sidecars(system_name, config) + return {"status": f"Config saved as {system_name}"} + raise HTTPException(status_code=500, detail="Failed to save configuration") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) @router.get("/library") async def get_library(): diff --git a/docker-compose.yml b/docker-compose.yml index e1a05f1..84e860a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,10 +7,11 @@ services: restart: unless-stopped ports: - 8001:8001 + - 8081:8081 devices: - "/dev/bus/usb:/dev/bus/usb" volumes: - - ./config:/app/config + - ./configs:/configs - ./op25_logs:/tmp/op25 env_file: - .env