Correlation updates

This commit is contained in:
Logan
2026-04-26 11:01:32 -04:00
parent 64232279ca
commit 92c8351864
3 changed files with 108 additions and 46 deletions
+2 -1
View File
@@ -28,7 +28,8 @@ class Settings(BaseSettings):
embedding_no_location_threshold: float = 0.97 # slow-path: match without location (very high bar)
location_proximity_km: float = 0.5 # radius for location-proximity matching
incident_auto_resolve_minutes: int = 90 # auto-resolve after N minutes with no new calls
recorrelation_scan_minutes: int = 15 # re-examine orphaned calls ended within this window
recorrelation_scan_minutes: int = 60 # re-examine orphaned calls ended within this window
tg_fast_path_idle_minutes: int = 30 # fast path: max minutes since incident last updated
# Vocabulary learning
vocabulary_induction_interval_hours: int = 24 # how often the induction loop runs
+92 -45
View File
@@ -3,27 +3,42 @@ Hybrid incident correlation engine.
Matching priority (in order):
1. Fast path — talkgroup + system match (any incident type, no time limit)
Active-status gate is sufficient. If multiple active incidents share the same
talkgroup (e.g. busy shared channel), disambiguate by:
a) Unit overlap — strongest signal, officer assigned to incident
1. Fast path — talkgroup + system match with recency gate
Only considers incidents updated within `tg_fast_path_idle_minutes` (default 30 min).
Rationale: a shared dispatch channel (e.g. "Yorktown PD Dispatch") handles ALL
incidents for a department. Without the recency gate, every call on that channel
would pile into whatever incident was created first, for hours.
If multiple recent incidents share the same talkgroup, disambiguate by:
a) Unit overlap — strongest signal
b) Vehicle overlap — vehicle description shared across calls
c) Location proximity — geocoded coords closer to which incident
d) Embedding similarity against each candidate's centroid (tiebreaker)
Falls back to most-recently-updated on tie.
Dispatch-channel strictness: when the talkgroup name contains "dispatch", the
fallback inside `_call_fits_incident` requires positive evidence (unit overlap
or location proximity) to link. Without this, every ungeocoded call on a
dispatch backbone would link to the one active incident on that channel.
Thin calls (no units/vehicles/coords) skip scene verification and link to the
most recently updated incident on this TGID — but only if that incident is
within the recency window.
2. Location path — geocoded coords within `location_proximity_km` (time-limited)
Primary mutual-aid signal: EMS + police at the same scene.
3. Slow path — embedding cosine similarity (time-limited, same type only)
Requires similarity >= threshold AND location within 4× proximity radius.
Never fires alone — location corroboration is mandatory.
High-confidence tier (>= embedding_no_location_threshold) can match without
location when geocoding failed on either side.
Calls with no incident_type skip new-incident creation but still run paths 13,
so unclassified calls (short transport end, "en route", etc.) can link to an
existing incident via talkgroup match.
"""
import math
import re
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional
@@ -31,6 +46,27 @@ from app.internal.logger import logger
from app.internal import firestore as fstore
from app.config import settings
_DISPATCH_TG_RE = re.compile(r"\bdispatch\b|\bdisp\b", re.IGNORECASE)
def _is_dispatch_channel(talkgroup_name: Optional[str]) -> bool:
"""True when the talkgroup is a shared dispatch backbone (not a tactical/working channel)."""
if not talkgroup_name:
return False
return bool(_DISPATCH_TG_RE.search(talkgroup_name))
def _incident_idle_minutes(inc: dict, now: datetime) -> float:
"""Minutes since the incident was last updated (or started). Returns 9999 on parse error."""
try:
raw = inc.get("updated_at") or inc.get("started_at") or ""
dt = datetime.fromisoformat(str(raw).replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return (now - dt).total_seconds() / 60
except Exception:
return 9999.0
# ─────────────────────────────────────────────────────────────────────────────
# Public entry point
@@ -86,38 +122,49 @@ async def correlate_call(
# was happening rather than running the full scene-verification logic.
is_thin_call = not call_units and not call_vehicles and not coords
# ── 1. Fast path: talkgroup match (any type, no time limit) ──────────────
# ── 1. Fast path: talkgroup match with recency gate ──────────────────────
#
# Two distinct behaviours depending on call substance:
# Only considers incidents updated within tg_fast_path_idle_minutes (default 30 min).
# A shared dispatch channel handles every incident for a department — without the
# recency gate, a 5-hour-old incident would absorb every new call on that channel.
#
# • Thin call → link to the most-recently-updated active incident on this
# TGID (i.e. the last conversation in progress).
#
# • Substantive call (has location / units / vehicles) → verify the call
# actually belongs to the matched incident before linking. When a busy
# dispatch channel runs multiple concurrent scenes (different addresses,
# different units) we split them into separate incidents rather than
# merging everything because they share a talkgroup.
# Two behaviours depending on call substance:
# • Thin call → link to the most-recently-updated recent TGID incident.
# • Substantive → verify via _call_fits_incident before linking.
if talkgroup_id is not None and system_id:
tg_str = str(talkgroup_id)
is_dispatch = _is_dispatch_channel(talkgroup_name)
tg_matches = [
inc for inc in all_active
if system_id in (inc.get("system_ids") or [])
and tg_str in (inc.get("talkgroup_ids") or [])
]
# Apply recency gate — only incidents active within the rolling window.
tg_recent = [
inc for inc in tg_matches
if _incident_idle_minutes(inc, now) <= settings.tg_fast_path_idle_minutes
]
if tg_matches and is_thin_call:
if tg_matches and not tg_recent:
logger.info(
f"Correlator fast-path skipped: all {len(tg_matches)} TGID incident(s) idle "
f"> {settings.tg_fast_path_idle_minutes}min; falling through to location/slow path"
)
if tg_recent and is_thin_call:
# Status/ack call — no scene data to reason about.
# Attach to whichever incident was most recently active on this TGID.
matched_incident = max(tg_matches, key=lambda inc: inc.get("updated_at", ""))
# Attach to whichever recent incident was most recently active on this TGID.
matched_incident = max(tg_recent, key=lambda inc: inc.get("updated_at", ""))
logger.info(
f"Correlator fast-path (thin→last TGID incident): "
f"call {call_id}{matched_incident['incident_id']}"
)
elif len(tg_matches) == 1:
candidate = tg_matches[0]
elif len(tg_recent) == 1:
candidate = tg_recent[0]
if _call_fits_incident(
candidate, call_units, call_vehicles, coords, settings.location_proximity_km
candidate, call_units, call_vehicles, coords,
settings.location_proximity_km, is_dispatch=is_dispatch,
):
matched_incident = candidate
logger.info(
@@ -126,15 +173,14 @@ async def correlate_call(
else:
logger.info(
f"Correlator fast-path skipped: call {call_id} — different scene "
f"from {candidate['incident_id']} (no unit overlap + distant location); "
f"will attempt new incident"
f"from {candidate['incident_id']}; will attempt new incident"
)
elif len(tg_matches) > 1:
elif len(tg_recent) > 1:
matched_incident = _disambiguate(
tg_matches, call_units, call_vehicles, coords, call_embedding
tg_recent, call_units, call_vehicles, coords, call_embedding
)
logger.info(
f"Correlator fast-path (disambig {len(tg_matches)} candidates): "
f"Correlator fast-path (disambig {len(tg_recent)} candidates): "
f"call {call_id}{matched_incident['incident_id']}"
)
@@ -309,24 +355,26 @@ def _call_fits_incident(
call_vehicles: list[str],
call_coords: Optional[dict],
proximity_km: float,
is_dispatch: bool = False,
) -> bool:
"""
Return True if this call plausibly belongs to the given incident.
This guards the single-talkgroup-match fast path on busy dispatch channels
where multiple concurrent scenes share one talkgroup. We only return False
(→ create a new incident) when there is *positive evidence* of a different
scene: a geocoded location that is too far away AND no unit or vehicle
overlap. In all ambiguous cases we default to True (link) to avoid
fragmenting short status calls that carry no location or unit information.
Positive signals (unit/vehicle overlap, location proximity) are always
respected. The fallback — when there is no evidence either way — depends
on channel type:
Examples that correctly split:
- Police dispatch sends units to two separate MVAs miles apart
- EMS handles overlapping aided cases at different addresses
• Tactical / working channel (is_dispatch=False): default True (link).
A working channel is dedicated to one scene; no evidence of separation
means they're probably the same call.
Examples that correctly stay together (domestic with split parties):
- Units at 10 Main St and 12 Main St — within proximity radius → True
- Same unit mentioned in both the call and the incident → True
• Dispatch channel (is_dispatch=True): default False (create new).
A dispatch channel carries every incident for a department. Linking
without positive evidence would merge unrelated incidents whenever
geocoding fails (which is common for partial street addresses).
Thin calls (no units/vehicles/coords) never reach this function —
they're intercepted by the is_thin_call branch above.
"""
# Unit overlap is the strongest positive signal: same officers = same call.
inc_units = set(inc.get("units") or [])
@@ -345,18 +393,17 @@ def _call_fits_incident(
call_coords["lat"], call_coords["lng"],
inc_coords["lat"], inc_coords["lng"],
)
# Within proximity radius → same scene (handles domestics with nearby split parties).
if dist_km <= proximity_km:
return True
# Different location AND no unit/vehicle overlap → different incident.
return False
# No geocoded location on one or both sides but the call IS substantive
# (has units or vehicles — otherwise is_thin_call would have caught it).
# Unit overlap already returned True above if present. If we reach here
# there is no overlap and no coords to compare — we cannot prove it is a
# different scene, so default to linking rather than fragmenting.
return True
# No geocoded location on one or both sides.
# On a tactical/working channel, default to linking (conservative — channel
# is dedicated to one scene so no evidence of separation ≈ same scene).
# On a dispatch channel, require positive evidence — without it we risk
# pulling every ungeocoded call in a shift into the same incident.
return not is_dispatch
async def _update_incident(
+14
View File
@@ -121,12 +121,26 @@ def _sync_transcribe(
language="en",
prompt=prompt,
response_format="verbose_json",
temperature=0,
)
text = response.text.strip() or None
# Filter hallucinated segments. Two sources of hallucination in P25 recordings:
#
# 1. Trailing silence / static — Whisper fills silence past real content with
# sequential radio codes (10-4, 10-5...). Clamped by audio duration.
#
# 2. Leading silence — OP25 recordings typically have a short silence at the
# start before the first PTT press. Whisper sometimes hallucinates filler
# words or codes over this silence. Detected via no_speech_prob > 0.8
# (Whisper's own confidence that a segment contains no real speech).
audio_duration: float = getattr(response, "duration", None) or float("inf")
segments = [
{"start": round(s.start, 2), "end": round(s.end, 2), "text": s.text.strip()}
for s in (response.segments or [])
if s.text.strip()
and s.start < audio_duration
and getattr(s, "no_speech_prob", 0.0) < 0.8
]
return text, segments
finally: