Files
server-26/drb-c2-core/app/internal/mqtt_handler.py
T
Logan 8b660d8e10 feat: incident correlation overhaul, signal-based auto-resolve, token fixes
Correlator
- Raise fast-path idle gate 30 → 90 min (tg_fast_path_idle_minutes)
- Fix disambiguate always-commits bug: run _call_fits_incident on winner
  before committing; fall through to new-incident creation if it fails
- Add unit-continuity path (path 1.5): matches all_active by shared unit
  IDs with a reassignment guard, bridges calls past the idle gate
- Add tag-based incident_type inference (_TAG_TYPE_HINTS) as GPT fallback,
  rescuing tagged calls that would have been dropped (616 observed orphans)
- Add master/child incident model: _create_master_incident, _demote_to_child,
  _add_child_to_master; new incidents stamped incident_type="master"
- Add cross-system parent detection (_find_cross_system_parent): two-signal
  scoring (road overlap=0.4, embedding≥0.78=0.3, proximity=0.3, threshold=0.5)
  wired into create-if-new path; creates master shell on first cross-system match
- Add maybe_resolve_parent: auto-resolves master when all children close;
  called from upload pipeline (LLM closure) and summarizer stale sweep
- Add signal-based auto-resolve via units_active/units_cleared tracking:
  GPT now extracts cleared_units per scene; _update_incident moves units
  between active/cleared lists and resolves the incident when active empties;
  stored on call doc for re-correlation sweep reuse
- Add _create_incident initialization of units_active/units_cleared fields

Re-correlation sweep
- Add corr_sweep_count + MAX_SWEEP_ATTEMPTS=3: orphans get 3 attempts
  then are tombstoned as corr_path="unlinked", ending the re-sweep loop
  (previously hammering each orphan 29-31 times per shift)

Intelligence extraction
- Add cleared_units to GPT prompt schema and rules
- Extract and propagate cleared_units per scene; merge across scenes;
  store on call doc for re-correlation sweep

Token management
- Fix token release bug: remove release_token call on discord_connected=False
  in MQTT checkin (transient Discord drops were orphaning bots mid-shift)
- Add PUT /tokens/{id}/prefer/{system_id} endpoint: lock a bot token to a
  system; pass _none as system_id to clear; stored bidirectionally on both
  token and system documents
- discord_join handler resolves preferred_token_id from system doc and passes
  system_name in MQTT payload
2026-05-10 19:49:05 -04:00

289 lines
12 KiB
Python

import asyncio
import json
from datetime import datetime, timezone
from typing import Optional
import paho.mqtt.client as mqtt
from app.config import settings
from app.internal.logger import logger
from app.internal import firestore as fstore
class MQTTHandler:
def __init__(self):
self._client: Optional[mqtt.Client] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._connected = False
def _build_client(self) -> mqtt.Client:
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id="drb-c2-core",
)
if settings.mqtt_user:
client.username_pw_set(settings.mqtt_user, settings.mqtt_pass)
client.on_connect = self._on_connect
client.on_disconnect = self._on_disconnect
client.on_message = self._on_message
return client
def _on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code == 0:
self._connected = True
client.subscribe("nodes/+/checkin", qos=1)
client.subscribe("nodes/+/status", qos=1)
client.subscribe("nodes/+/metadata", qos=1)
client.subscribe("nodes/+/key_request", qos=1)
logger.info("MQTT connected — subscribed to node topics.")
else:
logger.error(f"MQTT connect refused: {reason_code}")
def _on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties):
self._connected = False
logger.warning(f"MQTT disconnected: {reason_code}")
def _on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
except Exception:
logger.warning(f"Non-JSON MQTT message on {msg.topic}")
return
asyncio.run_coroutine_threadsafe(
self._dispatch(msg.topic, payload), self._loop
)
async def _dispatch(self, topic: str, payload: dict):
parts = topic.split("/")
# Expected: nodes/{node_id}/{type}
if len(parts) != 3 or parts[0] != "nodes":
return
node_id = parts[1]
msg_type = parts[2]
try:
if msg_type == "checkin":
await self._handle_checkin(node_id, payload)
elif msg_type == "status":
await self._handle_status(node_id, payload)
elif msg_type == "metadata":
await self._handle_metadata(node_id, payload)
elif msg_type == "key_request":
await self._handle_key_request(node_id)
except Exception as e:
logger.error(f"MQTT dispatch error [{msg_type}] from {node_id}: {e}")
# ------------------------------------------------------------------
# Checkin — upsert node; flag new unconfigured nodes
# ------------------------------------------------------------------
async def _handle_checkin(self, node_id: str, payload: dict):
existing = await fstore.doc_get("nodes", node_id)
now = datetime.now(timezone.utc)
if not existing:
# First time we've seen this node — create it as unconfigured, pending approval
doc = {
"node_id": node_id,
"name": payload.get("name", node_id),
"lat": payload.get("lat", 0.0),
"lon": payload.get("lon", 0.0),
"status": "unconfigured",
"configured": False,
"last_seen": now.isoformat(),
"assigned_system_id": None,
"approval_status": "pending",
}
await fstore.doc_set("nodes", node_id, doc, merge=False)
logger.info(f"New node registered: {node_id} — pending admin approval.")
else:
updates = {
"last_seen": now.isoformat(),
"name": payload.get("name", existing.get("name", node_id)),
"lat": payload.get("lat", existing.get("lat", 0.0)),
"lon": payload.get("lon", existing.get("lon", 0.0)),
}
# Only promote to online if already configured (don't overwrite explicit status)
if existing.get("configured") and existing.get("status") not in ("recording",):
updates["status"] = "online"
await fstore.doc_update("nodes", node_id, updates)
# NOTE: discord_connected in checkins is informational only — do NOT release the
# token here. The bot watchdog reconnects on transient Discord drops, so a single
# checkin with discord_connected=False during a brief reconnect window would
# incorrectly free the token while the bot is still active. Token release is
# handled exclusively by the discord_leave command and the node offline sweeper.
# ------------------------------------------------------------------
# Status update
# ------------------------------------------------------------------
async def _handle_status(self, node_id: str, payload: dict):
status = payload.get("status")
if not status:
return
await fstore.doc_update("nodes", node_id, {
"status": status,
"last_seen": datetime.now(timezone.utc).isoformat(),
})
# ------------------------------------------------------------------
# Metadata — call_start / call_end events
# ------------------------------------------------------------------
async def _handle_metadata(self, node_id: str, payload: dict):
event = payload.get("event")
if event == "call_start":
await self._on_call_start(node_id, payload)
elif event == "call_end":
await self._on_call_end(node_id, payload)
async def _on_call_start(self, node_id: str, payload: dict):
call_id = payload.get("call_id")
if not call_id:
return
# Look up assigned system for this node (cached — assignment rarely changes)
node = await fstore.doc_get_cached("nodes", node_id)
system_id = node.get("assigned_system_id") if node else None
started_at_raw = payload.get("started_at")
started_at = (
datetime.fromisoformat(started_at_raw)
if started_at_raw
else datetime.now(timezone.utc)
)
# Prefer the name from OP25 metadata; fall back to the system config
tgid_name = payload.get("tgid_name") or ""
if not tgid_name and system_id and payload.get("tgid"):
system_doc = await fstore.doc_get_cached("systems", system_id)
if system_doc:
tgid_int = int(payload["tgid"])
for tg in system_doc.get("config", {}).get("talkgroups", []):
if int(tg.get("id", -1)) == tgid_int:
tgid_name = tg.get("name", "")
break
doc = {
"call_id": call_id,
"node_id": node_id,
"system_id": system_id,
"talkgroup_id": payload.get("tgid"),
"talkgroup_name": tgid_name,
"freq": payload.get("freq"),
"srcaddr": payload.get("srcaddr"),
"started_at": started_at,
"ended_at": None,
"audio_url": None,
"transcript": None,
"incident_id": None,
"location": None,
"tags": [],
"status": "active",
}
await fstore.doc_set("calls", call_id, doc, merge=False)
logger.info(f"Call start: {call_id} (node={node_id}, tgid={payload.get('tgid')})")
async def _on_call_end(self, node_id: str, payload: dict):
call_id = payload.get("call_id")
if not call_id:
return
ended_at_raw = payload.get("ended_at")
ended_at = (
datetime.fromisoformat(ended_at_raw)
if ended_at_raw
else datetime.now(timezone.utc)
)
updates = {
"ended_at": ended_at,
"status": "ended",
}
if payload.get("audio_url"):
updates["audio_url"] = payload["audio_url"]
await fstore.doc_set("calls", call_id, updates)
logger.info(f"Call end: {call_id}")
# ------------------------------------------------------------------
# Key request — re-deliver an existing approved key to a node that
# lost its credentials (e.g. after a directory move / fresh volume)
# ------------------------------------------------------------------
async def _handle_key_request(self, node_id: str):
key_doc = await fstore.doc_get("node_keys", node_id)
if not key_doc or not key_doc.get("api_key"):
logger.warning(f"Key request from {node_id} but no key found in Firestore — node may not be approved yet.")
return
self.publish_node_key(node_id, key_doc["api_key"])
logger.info(f"Re-delivered API key to {node_id} on request.")
# ------------------------------------------------------------------
# Outbound — send a command to a specific node
# ------------------------------------------------------------------
def send_command(self, node_id: str, payload: dict) -> bool:
topic = f"nodes/{node_id}/commands"
if self._client and self._connected:
self._client.publish(topic, json.dumps(payload), qos=1)
logger.info(f"Command sent to {node_id}: {payload.get('action')}")
return True
logger.warning(f"MQTT not connected — could not send command to {node_id}")
return False
def push_config(self, node_id: str, system_config: dict):
topic = f"nodes/{node_id}/config"
if self._client and self._connected:
self._client.publish(topic, json.dumps(system_config), qos=1)
logger.info(f"Config pushed to {node_id}")
else:
logger.warning(f"MQTT not connected — could not push config to {node_id}")
def publish_node_key(self, node_id: str, api_key: str):
"""Publish the provisioned API key to the node (retained so it survives reconnects)."""
topic = f"nodes/{node_id}/api_key"
if self._client and self._connected:
self._client.publish(topic, json.dumps({"api_key": api_key}), qos=2, retain=True)
logger.info(f"API key provisioned to {node_id}")
else:
logger.warning(f"MQTT not connected — could not provision key to {node_id}")
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def connect(self):
self._loop = asyncio.get_event_loop()
self._client = self._build_client()
# Start the paho network loop first so it drives reconnects automatically,
# then keep attempting the initial TCP connect until it succeeds.
self._client.loop_start()
asyncio.create_task(self._connect_with_retry())
async def _connect_with_retry(self):
delay = 5
logger.info(f"MQTT connecting to {settings.mqtt_broker}:{settings.mqtt_port}")
while True:
try:
self._client.connect(settings.mqtt_broker, settings.mqtt_port, keepalive=60)
return # paho loop_start + reconnect_delay_set handles the rest
except Exception as e:
logger.warning(f"MQTT connect failed ({e}) — retrying in {delay}s")
await asyncio.sleep(delay)
delay = min(delay * 2, 60)
async def disconnect(self):
if self._client:
self._client.loop_stop()
self._client.disconnect()
@property
def is_connected(self) -> bool:
return self._connected
mqtt_handler = MQTTHandler()