Files
server-26/drb-c2-core/app/routers/admin.py
T
Logan 4b7d9dd49a feat: enrich correlation debug with fit_signal and orphan breakdown
_call_fits_incident now returns (bool, signal_str) so each correlation
decision records exactly what evidence fired: unit_overlap, vehicle_overlap,
location_proximity, time_fallback, tactical_default, or the corresponding
false-return variants (unit_loc_conflict, content_divergence, etc.).

- corr_fit_signal and corr_matched_units written to call docs for
  fast/single and fast/disambig paths
- Admin debug endpoint exposes the new fields in calls_detail
- Orphan section adds orphans_by_talkgroup summary (count, no-type count,
  sweep-exhausted count per TGID) and raises orphan limit 100 → 250
- Admin page shows corr_path and fit_signal distribution panels above raw
  JSON; time_fallback highlighted in yellow as a diagnostic marker

No correlation logic changed — diagnostic data only.
2026-05-25 12:54:34 -04:00

139 lines
6.5 KiB
Python

import asyncio
from datetime import datetime, timezone, timedelta
from fastapi import APIRouter, Depends, Query
from app.internal.auth import require_admin_token, require_firebase_token
from app.internal.feature_flags import get_flags, set_flags
from app.internal import firestore as fstore
router = APIRouter(prefix="/admin", tags=["admin"])
@router.get("/features")
async def get_feature_flags(_=Depends(require_firebase_token)):
"""Return the current AI feature flag state. Any authenticated user can read."""
return await get_flags()
@router.put("/features")
async def update_feature_flags(body: dict, _=Depends(require_admin_token)):
"""Update one or more AI feature flags. Admin only."""
return await set_flags(body)
@router.get("/debug/correlation")
async def debug_correlation(
limit: int = Query(20, ge=1, le=100),
orphan_hours: int = Query(48, ge=1, le=168),
_=Depends(require_admin_token),
):
"""
Return the last N incidents with full correlation debug detail, plus recent orphaned calls.
Each incident includes a calls_detail array with per-call corr_* fields so you can see
exactly which correlation path fired (or didn't) for every call in the incident.
Embeddings are stripped — they're large float arrays and unreadable.
Query params:
limit — number of incidents to return, sorted by updated_at desc (default 20, max 100)
orphan_hours — how far back to scan for orphaned calls (default 48h, max 168h / 1 week)
"""
def _strip(doc: dict) -> dict:
return {k: v for k, v in doc.items() if k != "embedding"}
def _call_summary(call: dict) -> dict:
return {
"call_id": call.get("call_id"),
"started_at": call.get("started_at"),
"ended_at": call.get("ended_at"),
"duration_s": call.get("duration_s"),
"talkgroup_id": call.get("talkgroup_id"),
"talkgroup_name": call.get("talkgroup_name"),
"system_id": call.get("system_id"),
"node_id": call.get("node_id"),
"incident_type": call.get("incident_type"),
"tags": call.get("tags"),
"location": call.get("location"),
"location_coords": call.get("location_coords"),
"units": call.get("units"),
"vehicles": call.get("vehicles"),
"cleared_units": call.get("cleared_units"),
"severity": call.get("severity"),
"transcript": call.get("transcript_corrected") or call.get("transcript"),
# Correlation decision fields written back by incident_correlator
"corr_path": call.get("corr_path"),
"corr_incident_idle_min": call.get("corr_incident_idle_min"),
"corr_distance_km": call.get("corr_distance_km"),
"corr_score": call.get("corr_score"),
"corr_candidates": call.get("corr_candidates"),
"corr_shared_units": call.get("corr_shared_units"),
"corr_fit_signal": call.get("corr_fit_signal"),
"corr_matched_units": call.get("corr_matched_units"),
"corr_sweep_count": call.get("corr_sweep_count"),
"skip_reason": call.get("skip_reason"),
}
# ── Fetch recent incidents ────────────────────────────────────────────────
all_incidents = await fstore.collection_list("incidents")
all_incidents.sort(key=lambda i: i.get("updated_at", ""), reverse=True)
incidents = all_incidents[:limit]
# ── Fetch all linked call docs in parallel ────────────────────────────────
all_call_ids: list[str] = []
for inc in incidents:
all_call_ids.extend(inc.get("call_ids") or [])
unique_call_ids = list(dict.fromkeys(all_call_ids)) # dedupe, preserve order
call_docs = await asyncio.gather(*(fstore.doc_get("calls", cid) for cid in unique_call_ids))
call_map: dict[str, dict] = {doc["call_id"]: doc for doc in call_docs if doc}
# ── Build incident debug records ──────────────────────────────────────────
incident_records = []
for inc in incidents:
rec = _strip(inc)
rec["calls_detail"] = [
_call_summary(call_map[cid])
for cid in (inc.get("call_ids") or [])
if cid in call_map
]
incident_records.append(rec)
# ── Recent orphaned calls ─────────────────────────────────────────────────
cutoff = datetime.now(timezone.utc) - timedelta(hours=orphan_hours)
recent_ended = await fstore.collection_where("calls", [
("status", "==", "ended"),
("ended_at", ">=", cutoff),
])
orphans = [
_call_summary(c) for c in recent_ended
if not c.get("incident_ids") and not c.get("incident_id")
]
orphans.sort(key=lambda c: c.get("started_at", ""), reverse=True)
# Summarise orphans by talkgroup so the volume and source are immediately visible.
orphans_by_tg: dict[str, dict] = {}
for o in orphans:
tg_key = str(o.get("talkgroup_id") or "unknown")
if tg_key not in orphans_by_tg:
orphans_by_tg[tg_key] = {
"talkgroup_id": o.get("talkgroup_id"),
"talkgroup_name": o.get("talkgroup_name") or "unknown",
"count": 0,
"no_type_count": 0,
"sweep_exhausted_count": 0,
}
orphans_by_tg[tg_key]["count"] += 1
if not o.get("incident_type") and not o.get("tags"):
orphans_by_tg[tg_key]["no_type_count"] += 1
if (o.get("corr_sweep_count") or 0) >= 3:
orphans_by_tg[tg_key]["sweep_exhausted_count"] += 1
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"incident_count": len(incident_records),
"orphaned_call_count": len(orphans),
"orphans_by_talkgroup": sorted(orphans_by_tg.values(), key=lambda x: x["count"], reverse=True),
"incidents": incident_records,
"orphaned_calls": orphans[:250],
}