Implement Metadata Watcher #1
@@ -186,12 +186,16 @@ async def mqtt_lifecycle_manager():
|
||||
async def metadata_watcher():
|
||||
"""
|
||||
Polls OP25 HTTP terminal for metadata and publishes events to MQTT.
|
||||
Corrected to use the POST-based command API found in the HAR capture.
|
||||
"""
|
||||
last_tgid = 0
|
||||
last_metadata = {}
|
||||
potential_end_time = None
|
||||
DEBOUNCE_SECONDS = 2.5
|
||||
OP25_DATA_URL = "http://127.0.0.1:8081/data.json"
|
||||
OP25_DATA_URL = "http://127.0.0.1:8081/"
|
||||
|
||||
# This is the specific payload the OP25 web interface uses [cite: 45562, 45563]
|
||||
COMMAND_PAYLOAD = [{"command": "update", "arg1": 0, "arg2": 0}]
|
||||
|
||||
while True:
|
||||
if not MQTT_CONNECTED:
|
||||
@@ -199,9 +203,12 @@ async def mqtt_lifecycle_manager():
|
||||
continue
|
||||
|
||||
try:
|
||||
# Run blocking request in executor to avoid blocking the asyncio loop
|
||||
# Run blocking POST request in executor
|
||||
loop = asyncio.get_running_loop()
|
||||
response = await loop.run_in_executor(None, lambda: requests.get(OP25_DATA_URL, timeout=0.5))
|
||||
response = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: requests.post(OP25_DATA_URL, json=COMMAND_PAYLOAD, timeout=0.5)
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
@@ -209,10 +216,17 @@ async def mqtt_lifecycle_manager():
|
||||
current_tgid = 0
|
||||
current_meta = {}
|
||||
|
||||
# Handle multi_rx list or single dict structure
|
||||
if isinstance(data, list):
|
||||
for ch in data:
|
||||
t = ch.get("tgid", 0)
|
||||
# The response is an array of update objects
|
||||
for item in data:
|
||||
if item.get("json_type") == "channel_update":
|
||||
# The terminal provides channel info keyed by channel index (e.g., "0")
|
||||
# We look for the first channel that has an active TGID
|
||||
for key in item:
|
||||
if key.isdigit():
|
||||
ch = item[key]
|
||||
t = ch.get("tgid")
|
||||
|
||||
# OP25 returns null or 0 when no talkgroup is active
|
||||
if t and int(t) > 0:
|
||||
current_tgid = int(t)
|
||||
current_meta = {
|
||||
@@ -222,29 +236,19 @@ async def mqtt_lifecycle_manager():
|
||||
"sysname": str(ch.get("system", "")).strip()
|
||||
}
|
||||
break
|
||||
elif isinstance(data, dict):
|
||||
t = data.get("tgid", 0)
|
||||
if t and int(t) > 0:
|
||||
current_tgid = int(t)
|
||||
current_meta = {
|
||||
"tgid": str(t),
|
||||
"alpha_tag": str(data.get("tag", "")).strip(),
|
||||
"frequency": str(data.get("freq", 0)),
|
||||
"sysname": str(data.get("system", "")).strip()
|
||||
}
|
||||
if current_tgid: break
|
||||
|
||||
now = datetime.now()
|
||||
|
||||
# Logic for handling call start/end events
|
||||
if current_tgid != 0:
|
||||
potential_end_time = None # Reset debounce
|
||||
potential_end_time = None
|
||||
|
||||
if current_tgid != last_tgid:
|
||||
if last_tgid != 0:
|
||||
# End previous call immediately if switching channels
|
||||
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata}
|
||||
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
|
||||
|
||||
# Start new call
|
||||
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_start", "metadata": current_meta}
|
||||
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
|
||||
last_tgid = current_tgid
|
||||
|
||||
@@ -7,6 +7,7 @@ services:
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- 8001:8001
|
||||
- 8081:8081
|
||||
devices:
|
||||
- "/dev/bus/usb:/dev/bus/usb"
|
||||
volumes:
|
||||
|
||||
Reference in New Issue
Block a user