diff --git a/drb-c2-core/app/internal/incident_correlator.py b/drb-c2-core/app/internal/incident_correlator.py index 86ce904..84e9f99 100644 --- a/drb-c2-core/app/internal/incident_correlator.py +++ b/drb-c2-core/app/internal/incident_correlator.py @@ -46,6 +46,18 @@ from app.internal.logger import logger from app.internal import firestore as fstore from app.config import settings +_PURSUIT_TAGS = frozenset({ + "vehicle-pursuit", "pursuit", "foot-pursuit", "chase", + "fleeing-vehicle", "suspect-vehicle", "eluding", +}) + +# Maximum plausible ground speed for a moving incident (pursuit/transport). +# ~3 miles/min ≈ 180 mph — well above real pursuit speeds, but generous enough +# to tolerate GPS drift and call-timing jitter. Anything faster is a bad geocode +# or an unrelated call being falsely proximity-matched. +_MAX_PURSUIT_SPEED_KM_PER_MIN = 8.0 # ~300 km/h, intentionally generous +_PURSUIT_PROXIMITY_KM = 20.0 # expanded radius for moving incidents + _DISPATCH_TG_RE = re.compile( r"\bdispatch\b|\bdisp\b" r"|\bpatched\b" # patched channels aggregate multiple call streams @@ -263,31 +275,47 @@ async def correlate_call( ) if tg_recent and is_thin_call: - # Status/ack call — no scene data to reason about. - # On dispatch channels (shared backbone), apply a much tighter idle gate so - # a "10-4" or "Dispatch." doesn't re-activate an incident that's been quiet - # for an hour and then absorb the next unrelated dispatch on the same TGID. + # Content-free status calls ("10-4", "Copy", "En route") — two tiers: + # + # Tier 1 — ≤30 seconds idle: this is a direct conversational reply to + # whatever was just transmitted. Attach to the most recently updated + # incident regardless of how many are active; within 30 seconds, the + # "most recently updated" IS the active thread. + # + # Tier 2 — 30 seconds to tg_dispatch_thin_idle_minutes: channel context + # is less clear. Only attach when there is exactly ONE candidate to + # avoid guessing on a busy multi-incident channel. if is_dispatch: - thin_pool = [ + THIN_CONVERSATIONAL_SECS = 30 + very_recent = [ inc for inc in tg_recent - if _incident_idle_minutes(inc, now) <= settings.tg_dispatch_thin_idle_minutes + if _incident_idle_minutes(inc, now) * 60 <= THIN_CONVERSATIONAL_SECS ] - # A shared dispatch channel may have multiple concurrent incidents. - # If more than one is active in the thin window, we cannot know which - # incident this "10-4" or "Copy" belongs to — skip rather than guess. - if len(thin_pool) > 1: + if very_recent: + # Tier 1: direct conversational reply — most recent wins. + thin_pool = [max(very_recent, key=lambda inc: inc.get("updated_at", ""))] logger.info( - f"Correlator fast-path thin: {len(thin_pool)} active incidents on " - f"dispatch channel — ambiguous, skipping thin call {call_id}" + f"Correlator fast-path thin (tier-1, ≤{THIN_CONVERSATIONAL_SECS}s): " + f"using most-recent of {len(very_recent)} candidate(s) for call {call_id}" ) - thin_pool = [] + else: + # Tier 2: less certain — require single candidate. + thin_pool = [ + inc for inc in tg_recent + if _incident_idle_minutes(inc, now) <= settings.tg_dispatch_thin_idle_minutes + ] + if len(thin_pool) > 1: + logger.info( + f"Correlator fast-path thin (tier-2): {len(thin_pool)} active incidents " + f"on dispatch channel — ambiguous, skipping thin call {call_id}" + ) + thin_pool = [] else: thin_pool = tg_recent if not thin_pool: logger.info( - f"Correlator fast-path thin: dispatch channel idle > " - f"{settings.tg_dispatch_thin_idle_minutes}min, skipping thin call {call_id}" + f"Correlator fast-path thin: no suitable candidate for call {call_id}" ) else: # Attach to whichever pool incident was most recently active on this TGID. @@ -447,12 +475,24 @@ async def correlate_call( coords["lat"], coords["lng"], inc_coords["lat"], inc_coords["lng"], ) - if dist_km <= settings.location_proximity_km: + inc_tags_set = set(inc.get("tags") or []) + is_pursuit_inc = bool(inc_tags_set & _PURSUIT_TAGS) + radius = _PURSUIT_PROXIMITY_KM if is_pursuit_inc else settings.location_proximity_km + # For pursuit incidents, additionally validate movement speed. + if is_pursuit_inc and dist_km > settings.location_proximity_km: + elapsed_min = max(_incident_idle_minutes(inc, now), 0.1) + if (dist_km / elapsed_min) > _MAX_PURSUIT_SPEED_KM_PER_MIN: + continue # implausible speed — skip this candidate + if dist_km <= radius: matched_incident = inc - corr_debug = {"corr_path": "location", "corr_distance_km": round(dist_km, 3)} + corr_debug = { + "corr_path": "location", + "corr_distance_km": round(dist_km, 3), + "corr_pursuit_mode": is_pursuit_inc, + } logger.info( f"Correlator location-path: call {call_id} → {inc['incident_id']} " - f"(dist={dist_km:.2f}km)" + f"(dist={dist_km:.2f}km, pursuit={is_pursuit_inc})" ) break @@ -855,18 +895,48 @@ def _call_fits_incident( return True, "vehicle_overlap" # ── 3. Location proximity ───────────────────────────────────────────────── + # For pursuit-type incidents, a location CHANGE is expected — the suspect is + # moving. Use an expanded radius and validate with a speed-sanity check so + # we don't absorb a call from a genuinely different scene. + # For all other incident types, a location mismatch means a different scene. inc_coords = inc.get("location_coords") if call_coords and inc_coords: dist_km = _haversine_km( call_coords["lat"], call_coords["lng"], inc_coords["lat"], inc_coords["lng"], ) - if dist_km <= proximity_km: - logger.info(f" fits[{inc_id}]: location_proximity dist={dist_km:.2f}km → location_proximity") - return True, "location_proximity" - # Conflicting location, no other positive signal → different scene. - logger.info(f" fits[{inc_id}]: location_conflict dist={dist_km:.2f}km → location_conflict") - return False, "location_conflict" + inc_tags = set(inc.get("tags") or []) + is_pursuit = bool(inc_tags & _PURSUIT_TAGS) + if is_pursuit: + # Speed sanity: distance travelled ÷ time since last update must be + # plausible for a vehicle. Protects against a distant unrelated call + # that happens to share pursuit tags accidentally matching. + speed_ok = False + if now is not None and dist_km > proximity_km: + elapsed_min = max(idle_min, 0.1) # avoid div-by-zero + speed_km_per_min = dist_km / elapsed_min + speed_ok = speed_km_per_min <= _MAX_PURSUIT_SPEED_KM_PER_MIN + logger.info( + f" fits[{inc_id}]: pursuit speed check — " + f"dist={dist_km:.2f}km elapsed={elapsed_min:.1f}min " + f"speed={speed_km_per_min:.1f}km/min ok={speed_ok}" + ) + effective_radius = _PURSUIT_PROXIMITY_KM if is_pursuit else proximity_km + if dist_km <= proximity_km or (is_pursuit and speed_ok and dist_km <= effective_radius): + logger.info(f" fits[{inc_id}]: location_proximity dist={dist_km:.2f}km (pursuit={is_pursuit}) → location_proximity") + return True, "location_proximity" + if is_pursuit: + logger.info(f" fits[{inc_id}]: pursuit location rejected — dist={dist_km:.2f}km exceeds radius or speed sanity failed → location_conflict") + else: + logger.info(f" fits[{inc_id}]: location_conflict dist={dist_km:.2f}km → location_conflict") + return False, "location_conflict" + else: + if dist_km <= proximity_km: + logger.info(f" fits[{inc_id}]: location_proximity dist={dist_km:.2f}km → location_proximity") + return True, "location_proximity" + # Conflicting location, no other positive signal → different scene. + logger.info(f" fits[{inc_id}]: location_conflict dist={dist_km:.2f}km → location_conflict") + return False, "location_conflict" # ── 4. No positive signals ──────────────────────────────────────────────── logger.info( @@ -876,13 +946,12 @@ def _call_fits_incident( f"call_coords={call_coords is not None} inc_coords={inc_coords is not None}" ) if is_dispatch: - # Conversational continuity: the call arrived during the same conversation - # thread (< 2 min since last incident activity) with no contradicting evidence. - # Suppressed for reassignment calls — unit is breaking to a new scene and - # should not chain back to the current incident even if very recent. - if idle_min < 2 and not reassignment: - return True, "time_fallback" - # Shared dispatch channel — do not link without at least one positive signal. + # Dispatch channels require at least one positive signal (unit, vehicle, + # or location match). A substantive call with no matching signals is more + # likely a separate incident than a follow-up to the current one — two + # dispatches can arrive within seconds of each other on a busy channel. + # Content-free thin calls are handled before this function via the thin + # path in correlate_call, with a tighter 30-second recency window. return False, "no_signal" # Tactical channel: one scene per channel.