diff --git a/drb-c2-core/app/config.py b/drb-c2-core/app/config.py index 8d0d5d8..d74a1d2 100644 --- a/drb-c2-core/app/config.py +++ b/drb-c2-core/app/config.py @@ -28,7 +28,8 @@ class Settings(BaseSettings): embedding_no_location_threshold: float = 0.97 # slow-path: match without location (very high bar) location_proximity_km: float = 0.5 # radius for location-proximity matching incident_auto_resolve_minutes: int = 90 # auto-resolve after N minutes with no new calls - recorrelation_scan_minutes: int = 15 # re-examine orphaned calls ended within this window + recorrelation_scan_minutes: int = 60 # re-examine orphaned calls ended within this window + tg_fast_path_idle_minutes: int = 30 # fast path: max minutes since incident last updated # Vocabulary learning vocabulary_induction_interval_hours: int = 24 # how often the induction loop runs diff --git a/drb-c2-core/app/internal/incident_correlator.py b/drb-c2-core/app/internal/incident_correlator.py index bbc8899..b602c43 100644 --- a/drb-c2-core/app/internal/incident_correlator.py +++ b/drb-c2-core/app/internal/incident_correlator.py @@ -3,27 +3,42 @@ Hybrid incident correlation engine. Matching priority (in order): -1. Fast path — talkgroup + system match (any incident type, no time limit) - Active-status gate is sufficient. If multiple active incidents share the same - talkgroup (e.g. busy shared channel), disambiguate by: - a) Unit overlap — strongest signal, officer assigned to incident +1. Fast path — talkgroup + system match with recency gate + Only considers incidents updated within `tg_fast_path_idle_minutes` (default 30 min). + Rationale: a shared dispatch channel (e.g. "Yorktown PD – Dispatch") handles ALL + incidents for a department. Without the recency gate, every call on that channel + would pile into whatever incident was created first, for hours. + + If multiple recent incidents share the same talkgroup, disambiguate by: + a) Unit overlap — strongest signal b) Vehicle overlap — vehicle description shared across calls c) Location proximity — geocoded coords closer to which incident d) Embedding similarity against each candidate's centroid (tiebreaker) Falls back to most-recently-updated on tie. + Dispatch-channel strictness: when the talkgroup name contains "dispatch", the + fallback inside `_call_fits_incident` requires positive evidence (unit overlap + or location proximity) to link. Without this, every ungeocoded call on a + dispatch backbone would link to the one active incident on that channel. + + Thin calls (no units/vehicles/coords) skip scene verification and link to the + most recently updated incident on this TGID — but only if that incident is + within the recency window. + 2. Location path — geocoded coords within `location_proximity_km` (time-limited) Primary mutual-aid signal: EMS + police at the same scene. 3. Slow path — embedding cosine similarity (time-limited, same type only) Requires similarity >= threshold AND location within 4× proximity radius. - Never fires alone — location corroboration is mandatory. + High-confidence tier (>= embedding_no_location_threshold) can match without + location when geocoding failed on either side. Calls with no incident_type skip new-incident creation but still run paths 1–3, so unclassified calls (short transport end, "en route", etc.) can link to an existing incident via talkgroup match. """ import math +import re import uuid from datetime import datetime, timezone, timedelta from typing import Optional @@ -31,6 +46,27 @@ from app.internal.logger import logger from app.internal import firestore as fstore from app.config import settings +_DISPATCH_TG_RE = re.compile(r"\bdispatch\b|\bdisp\b", re.IGNORECASE) + + +def _is_dispatch_channel(talkgroup_name: Optional[str]) -> bool: + """True when the talkgroup is a shared dispatch backbone (not a tactical/working channel).""" + if not talkgroup_name: + return False + return bool(_DISPATCH_TG_RE.search(talkgroup_name)) + + +def _incident_idle_minutes(inc: dict, now: datetime) -> float: + """Minutes since the incident was last updated (or started). Returns 9999 on parse error.""" + try: + raw = inc.get("updated_at") or inc.get("started_at") or "" + dt = datetime.fromisoformat(str(raw).replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return (now - dt).total_seconds() / 60 + except Exception: + return 9999.0 + # ───────────────────────────────────────────────────────────────────────────── # Public entry point @@ -86,38 +122,49 @@ async def correlate_call( # was happening rather than running the full scene-verification logic. is_thin_call = not call_units and not call_vehicles and not coords - # ── 1. Fast path: talkgroup match (any type, no time limit) ────────────── + # ── 1. Fast path: talkgroup match with recency gate ────────────────────── # - # Two distinct behaviours depending on call substance: + # Only considers incidents updated within tg_fast_path_idle_minutes (default 30 min). + # A shared dispatch channel handles every incident for a department — without the + # recency gate, a 5-hour-old incident would absorb every new call on that channel. # - # • Thin call → link to the most-recently-updated active incident on this - # TGID (i.e. the last conversation in progress). - # - # • Substantive call (has location / units / vehicles) → verify the call - # actually belongs to the matched incident before linking. When a busy - # dispatch channel runs multiple concurrent scenes (different addresses, - # different units) we split them into separate incidents rather than - # merging everything because they share a talkgroup. + # Two behaviours depending on call substance: + # • Thin call → link to the most-recently-updated recent TGID incident. + # • Substantive → verify via _call_fits_incident before linking. if talkgroup_id is not None and system_id: tg_str = str(talkgroup_id) + is_dispatch = _is_dispatch_channel(talkgroup_name) + tg_matches = [ inc for inc in all_active if system_id in (inc.get("system_ids") or []) and tg_str in (inc.get("talkgroup_ids") or []) ] + # Apply recency gate — only incidents active within the rolling window. + tg_recent = [ + inc for inc in tg_matches + if _incident_idle_minutes(inc, now) <= settings.tg_fast_path_idle_minutes + ] - if tg_matches and is_thin_call: + if tg_matches and not tg_recent: + logger.info( + f"Correlator fast-path skipped: all {len(tg_matches)} TGID incident(s) idle " + f"> {settings.tg_fast_path_idle_minutes}min; falling through to location/slow path" + ) + + if tg_recent and is_thin_call: # Status/ack call — no scene data to reason about. - # Attach to whichever incident was most recently active on this TGID. - matched_incident = max(tg_matches, key=lambda inc: inc.get("updated_at", "")) + # Attach to whichever recent incident was most recently active on this TGID. + matched_incident = max(tg_recent, key=lambda inc: inc.get("updated_at", "")) logger.info( f"Correlator fast-path (thin→last TGID incident): " f"call {call_id} → {matched_incident['incident_id']}" ) - elif len(tg_matches) == 1: - candidate = tg_matches[0] + elif len(tg_recent) == 1: + candidate = tg_recent[0] if _call_fits_incident( - candidate, call_units, call_vehicles, coords, settings.location_proximity_km + candidate, call_units, call_vehicles, coords, + settings.location_proximity_km, is_dispatch=is_dispatch, ): matched_incident = candidate logger.info( @@ -126,15 +173,14 @@ async def correlate_call( else: logger.info( f"Correlator fast-path skipped: call {call_id} — different scene " - f"from {candidate['incident_id']} (no unit overlap + distant location); " - f"will attempt new incident" + f"from {candidate['incident_id']}; will attempt new incident" ) - elif len(tg_matches) > 1: + elif len(tg_recent) > 1: matched_incident = _disambiguate( - tg_matches, call_units, call_vehicles, coords, call_embedding + tg_recent, call_units, call_vehicles, coords, call_embedding ) logger.info( - f"Correlator fast-path (disambig {len(tg_matches)} candidates): " + f"Correlator fast-path (disambig {len(tg_recent)} candidates): " f"call {call_id} → {matched_incident['incident_id']}" ) @@ -309,24 +355,26 @@ def _call_fits_incident( call_vehicles: list[str], call_coords: Optional[dict], proximity_km: float, + is_dispatch: bool = False, ) -> bool: """ Return True if this call plausibly belongs to the given incident. - This guards the single-talkgroup-match fast path on busy dispatch channels - where multiple concurrent scenes share one talkgroup. We only return False - (→ create a new incident) when there is *positive evidence* of a different - scene: a geocoded location that is too far away AND no unit or vehicle - overlap. In all ambiguous cases we default to True (link) to avoid - fragmenting short status calls that carry no location or unit information. + Positive signals (unit/vehicle overlap, location proximity) are always + respected. The fallback — when there is no evidence either way — depends + on channel type: - Examples that correctly split: - - Police dispatch sends units to two separate MVAs miles apart - - EMS handles overlapping aided cases at different addresses + • Tactical / working channel (is_dispatch=False): default True (link). + A working channel is dedicated to one scene; no evidence of separation + means they're probably the same call. - Examples that correctly stay together (domestic with split parties): - - Units at 10 Main St and 12 Main St — within proximity radius → True - - Same unit mentioned in both the call and the incident → True + • Dispatch channel (is_dispatch=True): default False (create new). + A dispatch channel carries every incident for a department. Linking + without positive evidence would merge unrelated incidents whenever + geocoding fails (which is common for partial street addresses). + + Thin calls (no units/vehicles/coords) never reach this function — + they're intercepted by the is_thin_call branch above. """ # Unit overlap is the strongest positive signal: same officers = same call. inc_units = set(inc.get("units") or []) @@ -345,18 +393,17 @@ def _call_fits_incident( call_coords["lat"], call_coords["lng"], inc_coords["lat"], inc_coords["lng"], ) - # Within proximity radius → same scene (handles domestics with nearby split parties). if dist_km <= proximity_km: return True # Different location AND no unit/vehicle overlap → different incident. return False - # No geocoded location on one or both sides but the call IS substantive - # (has units or vehicles — otherwise is_thin_call would have caught it). - # Unit overlap already returned True above if present. If we reach here - # there is no overlap and no coords to compare — we cannot prove it is a - # different scene, so default to linking rather than fragmenting. - return True + # No geocoded location on one or both sides. + # On a tactical/working channel, default to linking (conservative — channel + # is dedicated to one scene so no evidence of separation ≈ same scene). + # On a dispatch channel, require positive evidence — without it we risk + # pulling every ungeocoded call in a shift into the same incident. + return not is_dispatch async def _update_incident( diff --git a/drb-c2-core/app/internal/transcription.py b/drb-c2-core/app/internal/transcription.py index e975345..7a20feb 100644 --- a/drb-c2-core/app/internal/transcription.py +++ b/drb-c2-core/app/internal/transcription.py @@ -121,12 +121,26 @@ def _sync_transcribe( language="en", prompt=prompt, response_format="verbose_json", + temperature=0, ) text = response.text.strip() or None + + # Filter hallucinated segments. Two sources of hallucination in P25 recordings: + # + # 1. Trailing silence / static — Whisper fills silence past real content with + # sequential radio codes (10-4, 10-5...). Clamped by audio duration. + # + # 2. Leading silence — OP25 recordings typically have a short silence at the + # start before the first PTT press. Whisper sometimes hallucinates filler + # words or codes over this silence. Detected via no_speech_prob > 0.8 + # (Whisper's own confidence that a segment contains no real speech). + audio_duration: float = getattr(response, "duration", None) or float("inf") segments = [ {"start": round(s.start, 2), "end": round(s.end, 2), "text": s.text.strip()} for s in (response.segments or []) if s.text.strip() + and s.start < audio_duration + and getattr(s, "no_speech_prob", 0.0) < 0.8 ] return text, segments finally: