Implement Metadata Watcher #1
@@ -183,10 +183,95 @@ async def mqtt_lifecycle_manager():
|
||||
lwt_payload = json.dumps({"status": "offline", "reason": "unexpected_disconnect"})
|
||||
client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True)
|
||||
|
||||
async def metadata_watcher():
|
||||
"""
|
||||
Polls OP25 HTTP terminal for metadata and publishes events to MQTT.
|
||||
"""
|
||||
last_tgid = 0
|
||||
last_metadata = {}
|
||||
potential_end_time = None
|
||||
DEBOUNCE_SECONDS = 2.5
|
||||
OP25_DATA_URL = "http://127.0.0.1:8081/data.json"
|
||||
|
||||
while True:
|
||||
if not MQTT_CONNECTED:
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
|
||||
try:
|
||||
# Run blocking request in executor to avoid blocking the asyncio loop
|
||||
loop = asyncio.get_running_loop()
|
||||
response = await loop.run_in_executor(None, lambda: requests.get(OP25_DATA_URL, timeout=0.5))
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
|
||||
current_tgid = 0
|
||||
current_meta = {}
|
||||
|
||||
# Handle multi_rx list or single dict structure
|
||||
if isinstance(data, list):
|
||||
for ch in data:
|
||||
t = ch.get("tgid", 0)
|
||||
if t and int(t) > 0:
|
||||
current_tgid = int(t)
|
||||
current_meta = {
|
||||
"tgid": str(t),
|
||||
"alpha_tag": str(ch.get("tag", "")).strip(),
|
||||
"frequency": str(ch.get("freq", 0)),
|
||||
"sysname": str(ch.get("system", "")).strip()
|
||||
}
|
||||
break
|
||||
elif isinstance(data, dict):
|
||||
t = data.get("tgid", 0)
|
||||
if t and int(t) > 0:
|
||||
current_tgid = int(t)
|
||||
current_meta = {
|
||||
"tgid": str(t),
|
||||
"alpha_tag": str(data.get("tag", "")).strip(),
|
||||
"frequency": str(data.get("freq", 0)),
|
||||
"sysname": str(data.get("system", "")).strip()
|
||||
}
|
||||
|
||||
now = datetime.now()
|
||||
|
||||
if current_tgid != 0:
|
||||
potential_end_time = None # Reset debounce
|
||||
|
||||
if current_tgid != last_tgid:
|
||||
if last_tgid != 0:
|
||||
# End previous call immediately if switching channels
|
||||
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata}
|
||||
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
|
||||
|
||||
# Start new call
|
||||
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_start", "metadata": current_meta}
|
||||
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
|
||||
last_tgid = current_tgid
|
||||
last_metadata = current_meta
|
||||
|
||||
elif last_tgid != 0:
|
||||
if potential_end_time is None:
|
||||
potential_end_time = now
|
||||
elif (now - potential_end_time).total_seconds() > DEBOUNCE_SECONDS:
|
||||
payload = {"node_id": NODE_ID, "timestamp": now.isoformat(), "event": "call_end", "metadata": last_metadata}
|
||||
client.publish(f"nodes/{NODE_ID}/metadata", json.dumps(payload), qos=0)
|
||||
last_tgid = 0
|
||||
last_metadata = {}
|
||||
potential_end_time = None
|
||||
|
||||
except Exception:
|
||||
pass # OP25 might be restarting or busy
|
||||
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
try:
|
||||
client.connect(MQTT_BROKER, 1883, 60)
|
||||
client.loop_start() # Run network loop in background thread
|
||||
|
||||
# Start the metadata watcher task
|
||||
watcher_task = asyncio.create_task(metadata_watcher())
|
||||
|
||||
# --- Main Heartbeat Loop ---
|
||||
while True:
|
||||
if MQTT_CONNECTED:
|
||||
@@ -194,10 +279,20 @@ async def mqtt_lifecycle_manager():
|
||||
# Pulse every 30 seconds
|
||||
# Only wait 30 sec if the HB sent. This way we don't stall a check-in
|
||||
await asyncio.sleep(30)
|
||||
else:
|
||||
await asyncio.sleep(5)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
LOGGER.info("Stopping MQTT Loop...")
|
||||
finally:
|
||||
# Cancel watcher
|
||||
if 'watcher_task' in locals():
|
||||
watcher_task.cancel()
|
||||
try:
|
||||
await watcher_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# Graceful Shutdown: Explicitly tell C2 we are leaving
|
||||
if MQTT_CONNECTED:
|
||||
shutdown_payload = json.dumps({"status": "offline", "reason": "clean_shutdown"})
|
||||
|
||||
Reference in New Issue
Block a user