import asyncio from datetime import datetime, timezone, timedelta from fastapi import APIRouter, Depends, Query from app.internal.auth import require_admin_token, require_firebase_token from app.internal.feature_flags import get_flags, set_flags from app.internal import firestore as fstore async def _get_ai_enabled_system_ids(global_flags: dict) -> set[str]: """Return system_ids where at least one AI function (STT or correlation) is effectively on.""" global_stt = global_flags.get("stt_enabled", True) global_corr = global_flags.get("correlation_enabled", True) all_systems = await fstore.collection_list("systems") enabled: set[str] = set() for system in all_systems: sid = system.get("system_id") if not sid: continue ai_flags = system.get("ai_flags") or {} if ai_flags.get("stt_enabled", global_stt) or ai_flags.get("correlation_enabled", global_corr): enabled.add(sid) return enabled router = APIRouter(prefix="/admin", tags=["admin"]) @router.get("/features") async def get_feature_flags(_=Depends(require_firebase_token)): """Return the current AI feature flag state. Any authenticated user can read.""" return await get_flags() @router.put("/features") async def update_feature_flags(body: dict, _=Depends(require_admin_token)): """Update one or more AI feature flags. Admin only.""" return await set_flags(body) @router.get("/debug/correlation") async def debug_correlation( limit: int = Query(20, ge=1, le=100), orphan_hours: int = Query(48, ge=1, le=168), _=Depends(require_admin_token), ): """ Return the last N incidents with full correlation debug detail, plus recent orphaned calls. Each incident includes a calls_detail array with per-call corr_* fields so you can see exactly which correlation path fired (or didn't) for every call in the incident. Embeddings are stripped — they're large float arrays and unreadable. Query params: limit — number of incidents to return, sorted by updated_at desc (default 20, max 100) orphan_hours — how far back to scan for orphaned calls (default 48h, max 168h / 1 week) """ def _strip(doc: dict) -> dict: return {k: v for k, v in doc.items() if k != "embedding"} def _call_summary(call: dict) -> dict: return { "call_id": call.get("call_id"), "started_at": call.get("started_at"), "ended_at": call.get("ended_at"), "duration_s": call.get("duration_s"), "talkgroup_id": call.get("talkgroup_id"), "talkgroup_name": call.get("talkgroup_name"), "system_id": call.get("system_id"), "node_id": call.get("node_id"), "incident_type": call.get("incident_type"), "tags": call.get("tags"), "location": call.get("location"), "location_coords": call.get("location_coords"), "units": call.get("units"), "vehicles": call.get("vehicles"), "cleared_units": call.get("cleared_units"), "severity": call.get("severity"), "transcript": call.get("transcript_corrected") or call.get("transcript"), # Correlation decision fields written back by incident_correlator "corr_path": call.get("corr_path"), "corr_incident_idle_min": call.get("corr_incident_idle_min"), "corr_distance_km": call.get("corr_distance_km"), "corr_score": call.get("corr_score"), "corr_candidates": call.get("corr_candidates"), "corr_shared_units": call.get("corr_shared_units"), "corr_fit_signal": call.get("corr_fit_signal"), "corr_matched_units": call.get("corr_matched_units"), "corr_sweep_count": call.get("corr_sweep_count"), "skip_reason": call.get("skip_reason"), } # ── Determine which systems have AI active ──────────────────────────────── global_flags = await get_flags() ai_systems = await _get_ai_enabled_system_ids(global_flags) # ── Fetch recent incidents (AI-enabled systems only) ────────────────────── all_incidents = await fstore.collection_list("incidents") all_incidents.sort(key=lambda i: i.get("updated_at", ""), reverse=True) ai_incidents = [ i for i in all_incidents if any(sid in ai_systems for sid in (i.get("system_ids") or [])) ] incidents = ai_incidents[:limit] # ── Fetch all linked call docs in parallel ──────────────────────────────── all_call_ids: list[str] = [] for inc in incidents: all_call_ids.extend(inc.get("call_ids") or []) unique_call_ids = list(dict.fromkeys(all_call_ids)) # dedupe, preserve order call_docs = await asyncio.gather(*(fstore.doc_get("calls", cid) for cid in unique_call_ids)) call_map: dict[str, dict] = {doc["call_id"]: doc for doc in call_docs if doc} # ── Build incident debug records ────────────────────────────────────────── incident_records = [] for inc in incidents: rec = _strip(inc) rec["calls_detail"] = [ _call_summary(call_map[cid]) for cid in (inc.get("call_ids") or []) if cid in call_map ] incident_records.append(rec) # ── Recent orphaned calls (AI-enabled systems only) ─────────────────────── # Use a single-field range query to avoid requiring a composite Firestore index; # filter status and system in Python. cutoff = datetime.now(timezone.utc) - timedelta(hours=orphan_hours) recent_calls = await fstore.collection_where("calls", [ ("ended_at", ">=", cutoff), ]) orphans = [ _call_summary(c) for c in recent_calls if c.get("status") == "ended" and not c.get("incident_ids") and not c.get("incident_id") and c.get("system_id") in ai_systems ] orphans.sort(key=lambda c: c.get("started_at", ""), reverse=True) # Summarise orphans by talkgroup so the volume and source are immediately visible. orphans_by_tg: dict[str, dict] = {} for o in orphans: tg_key = str(o.get("talkgroup_id") or "unknown") if tg_key not in orphans_by_tg: orphans_by_tg[tg_key] = { "talkgroup_id": o.get("talkgroup_id"), "talkgroup_name": o.get("talkgroup_name") or "unknown", "count": 0, "no_type_count": 0, "sweep_exhausted_count": 0, } orphans_by_tg[tg_key]["count"] += 1 if not o.get("incident_type") and not o.get("tags"): orphans_by_tg[tg_key]["no_type_count"] += 1 if (o.get("corr_sweep_count") or 0) >= 3: orphans_by_tg[tg_key]["sweep_exhausted_count"] += 1 return { "generated_at": datetime.now(timezone.utc).isoformat(), "incident_count": len(incident_records), "orphaned_call_count": len(orphans), "orphans_by_talkgroup": sorted(orphans_by_tg.values(), key=lambda x: x["count"], reverse=True), "incidents": incident_records, "orphaned_calls": orphans[:250], }