From 4b7d9dd49a07c9e87f0f9426bbdb6ff773d893d7 Mon Sep 17 00:00:00 2001 From: Logan Date: Mon, 25 May 2026 12:54:34 -0400 Subject: [PATCH] feat: enrich correlation debug with fit_signal and orphan breakdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _call_fits_incident now returns (bool, signal_str) so each correlation decision records exactly what evidence fired: unit_overlap, vehicle_overlap, location_proximity, time_fallback, tactical_default, or the corresponding false-return variants (unit_loc_conflict, content_divergence, etc.). - corr_fit_signal and corr_matched_units written to call docs for fast/single and fast/disambig paths - Admin debug endpoint exposes the new fields in calls_detail - Orphan section adds orphans_by_talkgroup summary (count, no-type count, sweep-exhausted count per TGID) and raises orphan limit 100 → 250 - Admin page shows corr_path and fit_signal distribution panels above raw JSON; time_fallback highlighted in yellow as a diagnostic marker No correlation logic changed — diagnostic data only. --- .../app/internal/incident_correlator.py | 58 +++++++++++------ drb-c2-core/app/routers/admin.py | 31 +++++++-- drb-frontend/app/admin/page.tsx | 63 ++++++++++++++++++- 3 files changed, 128 insertions(+), 24 deletions(-) diff --git a/drb-c2-core/app/internal/incident_correlator.py b/drb-c2-core/app/internal/incident_correlator.py index 1ba09be..086ab05 100644 --- a/drb-c2-core/app/internal/incident_correlator.py +++ b/drb-c2-core/app/internal/incident_correlator.py @@ -283,22 +283,28 @@ async def correlate_call( ) elif len(tg_recent) == 1: candidate = tg_recent[0] - if _call_fits_incident( + fit, fit_signal = _call_fits_incident( candidate, call_units, call_vehicles, coords, settings.location_proximity_km, is_dispatch=is_dispatch, call_embedding=call_embedding, now=now, - ): + ) + if fit: matched_incident = candidate corr_debug = { "corr_path": "fast/single", "corr_incident_idle_min": round(_incident_idle_minutes(candidate, now), 1), + "corr_fit_signal": fit_signal, } + if fit_signal == "unit_overlap" and call_units: + inc_unit_set = set(candidate.get("units") or []) + corr_debug["corr_matched_units"] = [u for u in call_units if u in inc_unit_set] logger.info( - f"Correlator fast-path: call {call_id} → {candidate['incident_id']}" + f"Correlator fast-path: call {call_id} → {candidate['incident_id']} " + f"(signal={fit_signal})" ) else: logger.info( - f"Correlator fast-path skipped: call {call_id} — different scene " + f"Correlator fast-path skipped: call {call_id} — {fit_signal} " f"from {candidate['incident_id']}; will attempt new incident" ) elif len(tg_recent) > 1: @@ -308,25 +314,30 @@ async def correlate_call( # Disambiguate picks the best candidate, but still verify the call # actually fits before committing — a new unrelated call on a busy # dispatch channel should create its own incident, not be force-merged. - if _call_fits_incident( + fit, fit_signal = _call_fits_incident( candidate, call_units, call_vehicles, coords, settings.location_proximity_km, is_dispatch=is_dispatch, call_embedding=call_embedding, now=now, - ): + ) + if fit: matched_incident = candidate corr_debug = { "corr_path": "fast/disambig", "corr_incident_idle_min": round(_incident_idle_minutes(candidate, now), 1), "corr_candidates": len(tg_recent), + "corr_fit_signal": fit_signal, } + if fit_signal == "unit_overlap" and call_units: + inc_unit_set = set(candidate.get("units") or []) + corr_debug["corr_matched_units"] = [u for u in call_units if u in inc_unit_set] logger.info( f"Correlator fast-path (disambig {len(tg_recent)} candidates): " - f"call {call_id} → {candidate['incident_id']}" + f"call {call_id} → {candidate['incident_id']} (signal={fit_signal})" ) else: logger.info( f"Correlator fast-path disambig: no candidate fits call {call_id} " - f"across {len(tg_recent)} incidents — will attempt new incident" + f"across {len(tg_recent)} incidents ({fit_signal}) — will attempt new incident" ) # ── 1.5. Unit-continuity path: same officer, not reassigned ───────────────── @@ -706,8 +717,17 @@ def _call_fits_incident( is_dispatch: bool = False, call_embedding: Optional[list] = None, now: Optional[datetime] = None, -) -> bool: +) -> tuple[bool, str]: """ + Return (fits, signal) — fits is True when this call plausibly belongs to + the incident; signal names the specific evidence that drove the decision. + + fits=True signals: "unit_overlap" | "vehicle_overlap" | "location_proximity" + | "time_fallback" | "tactical_default" + fits=False signals: "unit_loc_conflict" | "content_divergence" + | "location_conflict" | "no_signal" | "tactical_idle" + + Original docstring (logic unchanged): Return True if this call plausibly belongs to the given incident. Evaluation order for dispatch channels (is_dispatch=True): @@ -764,7 +784,7 @@ def _call_fits_incident( inc_coords_u["lat"], inc_coords_u["lng"], ) if dist_km > proximity_km: - return False + return False, "unit_loc_conflict" elif call_embedding and idle_min >= 15: # No geocode available AND old incident: use content divergence as a # location-proxy veto. After 15+ minutes an officer at a completely @@ -776,13 +796,13 @@ def _call_fits_incident( if inc_emb_u: sim = _cosine_similarity(call_embedding, inc_emb_u) if sim < 0.82: - return False - return True + return False, "content_divergence" + return True, "unit_overlap" # ── 2. Vehicle overlap ──────────────────────────────────────────────────── inc_vehicles = set(inc.get("vehicles") or []) if inc_vehicles and call_vehicles and any(v in inc_vehicles for v in call_vehicles): - return True + return True, "vehicle_overlap" # ── 3. Location proximity ───────────────────────────────────────────────── inc_coords = inc.get("location_coords") @@ -792,25 +812,27 @@ def _call_fits_incident( inc_coords["lat"], inc_coords["lng"], ) if dist_km <= proximity_km: - return True + return True, "location_proximity" # Conflicting location, no other positive signal → different scene. - return False + return False, "location_conflict" # ── 4. No positive signals ──────────────────────────────────────────────── if is_dispatch: # Conversational continuity: the call arrived during the same conversation # thread (< 2 min since last incident activity) with no contradicting evidence. if idle_min < 2: - return True + return True, "time_fallback" # Shared dispatch channel — do not link without at least one positive signal. - return False + return False, "no_signal" # Tactical channel: one scene per channel. # Within 20 min of the last incident activity, link by default — same # working channel almost certainly means same scene. # After 20 min of silence, require at least one positive signal; the same # frequency can be reused for a new unrelated incident later in the shift. - return idle_min < 20.0 + if idle_min < 20.0: + return True, "tactical_default" + return False, "tactical_idle" async def _update_incident( diff --git a/drb-c2-core/app/routers/admin.py b/drb-c2-core/app/routers/admin.py index 966fe09..a3f11b1 100644 --- a/drb-c2-core/app/routers/admin.py +++ b/drb-c2-core/app/routers/admin.py @@ -67,6 +67,8 @@ async def debug_correlation( "corr_score": call.get("corr_score"), "corr_candidates": call.get("corr_candidates"), "corr_shared_units": call.get("corr_shared_units"), + "corr_fit_signal": call.get("corr_fit_signal"), + "corr_matched_units": call.get("corr_matched_units"), "corr_sweep_count": call.get("corr_sweep_count"), "skip_reason": call.get("skip_reason"), } @@ -108,10 +110,29 @@ async def debug_correlation( ] orphans.sort(key=lambda c: c.get("started_at", ""), reverse=True) + # Summarise orphans by talkgroup so the volume and source are immediately visible. + orphans_by_tg: dict[str, dict] = {} + for o in orphans: + tg_key = str(o.get("talkgroup_id") or "unknown") + if tg_key not in orphans_by_tg: + orphans_by_tg[tg_key] = { + "talkgroup_id": o.get("talkgroup_id"), + "talkgroup_name": o.get("talkgroup_name") or "unknown", + "count": 0, + "no_type_count": 0, + "sweep_exhausted_count": 0, + } + orphans_by_tg[tg_key]["count"] += 1 + if not o.get("incident_type") and not o.get("tags"): + orphans_by_tg[tg_key]["no_type_count"] += 1 + if (o.get("corr_sweep_count") or 0) >= 3: + orphans_by_tg[tg_key]["sweep_exhausted_count"] += 1 + return { - "generated_at": datetime.now(timezone.utc).isoformat(), - "incident_count": len(incident_records), - "orphaned_call_count": len(orphans), - "incidents": incident_records, - "orphaned_calls": orphans[:100], + "generated_at": datetime.now(timezone.utc).isoformat(), + "incident_count": len(incident_records), + "orphaned_call_count": len(orphans), + "orphans_by_talkgroup": sorted(orphans_by_tg.values(), key=lambda x: x["count"], reverse=True), + "incidents": incident_records, + "orphaned_calls": orphans[:250], } diff --git a/drb-frontend/app/admin/page.tsx b/drb-frontend/app/admin/page.tsx index c05f65e..9d10e1c 100644 --- a/drb-frontend/app/admin/page.tsx +++ b/drb-frontend/app/admin/page.tsx @@ -113,7 +113,28 @@ function CorrelationDebugTab() { } const json = data ? JSON.stringify(data, null, 2) : null; - const meta = data as { incident_count?: number; orphaned_call_count?: number; generated_at?: string } | null; + const meta = data as { + incident_count?: number; + orphaned_call_count?: number; + generated_at?: string; + incidents?: Array<{ calls_detail?: Array<{ corr_path?: string; corr_fit_signal?: string }> }>; + orphans_by_talkgroup?: Array<{ talkgroup_id?: number; talkgroup_name?: string; count: number; no_type_count: number; sweep_exhausted_count: number }>; + } | null; + + // Aggregate corr_path and corr_fit_signal counts across all incident calls. + const pathCounts: Record = {}; + const signalCounts: Record = {}; + if (meta?.incidents) { + for (const inc of meta.incidents) { + for (const call of inc.calls_detail ?? []) { + const p = call.corr_path ?? "unknown"; + pathCounts[p] = (pathCounts[p] ?? 0) + 1; + if (call.corr_fit_signal) { + signalCounts[call.corr_fit_signal] = (signalCounts[call.corr_fit_signal] ?? 0) + 1; + } + } + } + } return (
@@ -172,6 +193,46 @@ function CorrelationDebugTab() {
)} + {meta && Object.keys(pathCounts).length > 0 && ( +
+
+

corr_path distribution

+ {Object.entries(pathCounts).sort((a, b) => b[1] - a[1]).map(([path, n]) => ( +
+ {path} + {n} +
+ ))} +
+
+

fit_signal distribution

+ {Object.keys(signalCounts).length === 0 + ?

No signal data yet — deploy correlator update first

+ : Object.entries(signalCounts).sort((a, b) => b[1] - a[1]).map(([sig, n]) => ( +
+ {sig} + {n} +
+ )) + } +
+
+ )} + + {meta?.orphans_by_talkgroup && meta.orphans_by_talkgroup.length > 0 && ( +
+

orphans by talkgroup

+ {meta.orphans_by_talkgroup.map((tg) => ( +
+ {tg.talkgroup_name} ({tg.talkgroup_id}) + + {tg.count} total · {tg.no_type_count} no-type · {tg.sweep_exhausted_count} exhausted + +
+ ))} +
+ )} + {json && (