Fix correlation over-merge, thin-call hallucination, and geocoding accuracy

- Cap unit-continuity path at 20 min idle (unit_continuity_max_idle_minutes)
- Block time_fallback and unit-continuity matching on reassignment calls
- Expand reassignment detection to cover unit-initiated self-reassignment
- Skip GPT extraction entirely for transcripts ≤5 words (prevents hallucinated tags/units)
- Reduce geocode_max_km from 75 to 40 to reject far-out-of-area results
- Include county in geocoding query for tighter jurisdiction anchoring
This commit is contained in:
Logan
2026-05-26 02:20:15 -04:00
parent 5eed4e08ce
commit f774be12b8
4 changed files with 60 additions and 22 deletions
+2 -1
View File
@@ -32,8 +32,9 @@ class Settings(BaseSettings):
embedding_no_location_threshold: float = 0.97 # slow-path: match without location (very high bar)
embedding_cross_tg_threshold: float = 0.85 # cross-TG path: same dept + 2+ shared units
location_proximity_km: float = 0.5 # radius for location-proximity matching
geocode_max_km: float = 75.0 # reject geocode results farther than this from the node
geocode_max_km: float = 40.0 # reject geocode results farther than this from the node
incident_auto_resolve_minutes: int = 90 # auto-resolve after N minutes with no new calls
unit_continuity_max_idle_minutes: int = 20 # unit-continuity path: skip if incident idle > this
recorrelation_scan_minutes: int = 60 # re-examine orphaned calls ended within this window
tg_fast_path_idle_minutes: int = 90 # fast path: max minutes since incident last updated
tg_dispatch_thin_idle_minutes: int = 10 # dispatch channels only: thin calls only attach to incidents idle < this many minutes
@@ -178,6 +178,7 @@ async def correlate_call(
units: Optional[list[str]] = None,
vehicles: Optional[list[str]] = None,
cleared_units: Optional[list[str]] = None,
reassignment: bool = False,
) -> Optional[str]:
"""
Link call_id to an existing incident or create a new one.
@@ -303,6 +304,7 @@ async def correlate_call(
candidate, call_units, call_vehicles, coords,
settings.location_proximity_km, is_dispatch=is_dispatch,
call_embedding=call_embedding, now=now,
reassignment=reassignment,
)
if fit:
matched_incident = candidate
@@ -342,6 +344,7 @@ async def correlate_call(
candidate, call_units, call_vehicles, coords,
settings.location_proximity_km, is_dispatch=is_dispatch,
call_embedding=call_embedding, now=now,
reassignment=reassignment,
)
if fit:
matched_incident = candidate
@@ -368,19 +371,25 @@ async def correlate_call(
# ── 1.5. Unit-continuity path: same officer, not reassigned ─────────────────
#
# Handles long calls (bookings, transports, late scene clearance) where the
# 90-min idle gate has fired but the officer is still on the same call.
# Searches ALL active incidents — no idle gate, no time limit.
# fast-path idle gate has fired but the officer is still on the same call.
# Capped at unit_continuity_max_idle_minutes to prevent stale cross-shift links.
#
# Reassignment guard: if the same unit appears in a MORE recently updated
# incident, the officer has moved on and we don't link back to the old one.
# This correctly handles officers dispatched to a second call mid-shift.
if not matched_incident and call_units and system_id:
if not matched_incident and call_units and system_id and not reassignment:
call_unit_set = set(call_units)
unit_candidates = [
inc for inc in all_active
if system_id in (inc.get("system_ids") or [])
and call_unit_set & set(inc.get("units") or [])
]
# Apply idle cap: units get reassigned; a 20+ min gap means the officer
# has almost certainly moved on or the incident closed.
unit_candidates = [
inc for inc in unit_candidates
if _incident_idle_minutes(inc, now) <= settings.unit_continuity_max_idle_minutes
]
if unit_candidates:
best_unit_inc = max(unit_candidates, key=lambda i: i.get("updated_at", ""))
reassigned_away = any(
@@ -742,6 +751,7 @@ def _call_fits_incident(
is_dispatch: bool = False,
call_embedding: Optional[list] = None,
now: Optional[datetime] = None,
reassignment: bool = False,
) -> tuple[bool, str]:
"""
Return (fits, signal) — fits is True when this call plausibly belongs to
@@ -859,7 +869,9 @@ def _call_fits_incident(
if is_dispatch:
# Conversational continuity: the call arrived during the same conversation
# thread (< 2 min since last incident activity) with no contradicting evidence.
if idle_min < 2:
# Suppressed for reassignment calls — unit is breaking to a new scene and
# should not chain back to the current incident even if very recent.
if idle_min < 2 and not reassignment:
return True, "time_fallback"
# Shared dispatch channel — do not link without at least one positive signal.
return False, "no_signal"
+36 -15
View File
@@ -33,13 +33,13 @@ Response format — a JSON object with a "scenes" array. Each scene:
cleared_units: list of unit IDs that explicitly signal back-in-service or available in this recording
severity: one of "minor" | "moderate" | "major" | "unknown"
resolved: true if this scene explicitly signals incident closure, false otherwise
reassignment: true if dispatch is actively pulling a unit away from their current assignment to respond to a new, different call — e.g. "Baker, can you clear and respond to...", "Adam, break from that and go to...". False if the unit is simply reporting in, updating status, or continuing their current assignment.
reassignment: true if a unit is breaking from their current scene to respond to a completely different call — whether dispatch-initiated ("Baker, can you clear and respond to...", "Adam, break from that and go to...") OR unit-initiated ("Show me headed to the vehicle complaint", "Can you show me to that call", a unit going 10-8 and self-requesting a new assignment). False if the unit is reporting in on their current scene, giving a status update, or requesting information about their existing call.
transcript_corrected: corrected text for this scene's transmissions only, or null
Rules:
- location: prefer intersections > addresses > mile markers > route+town > route alone > town alone. Empty string if none.
- tags: describe WHAT happened, not WHERE. Specific, lowercase, hyphenated. Do not use location names, road names, talkgroup names, or place names as tags (wrong: "lower-macy's", "canvas-route-6", "route-202"; right: "suspect-search", "shoplifting", "vehicle-pursuit"). Do not repeat incident_type as a tag.
- units: only identifiers explicitly mentioned, not inferred.
- units: ONLY identifiers that appear verbatim in the transcript. If the word or number is not literally present in the text above, do not include it. Never infer or guess unit IDs.
- Do not invent details not present in the transcript.
- incident_type: let the talkgroup channel be your primary signal. Use "fire" ONLY if the talkgroup is clearly a fire/rescue channel OR the transcript explicitly describes active fire, smoke, flames, or structure fire activation. Police or EMS referencing a fire scene → use "police" or "ems". When uncertain, prefer "other" over "fire".
- ten_codes: interpret radio codes using the department reference provided below. Do not guess codes not listed.
@@ -55,8 +55,9 @@ Talkgroup: {talkgroup_name}
# Geographic bias radius for geocoding — half-width in degrees (~55 km)
_GEO_DELTA = 0.5
# Cache node state (e.g. "New York") so we only reverse-geocode once per node
_node_state_cache: dict[str, str] = {}
# Cache node state (e.g. "New York") and county (e.g. "Westchester County") per node
_node_state_cache: dict[str, str] = {}
_node_county_cache: dict[str, str] = {}
# Police/law-enforcement phonetic alphabet words (APCO + NATO).
# A run of 5+ of these in a transcript is a strong Whisper hallucination signal.
@@ -163,6 +164,15 @@ async def extract_scenes(
pass
return []
# Transcripts with ≤5 words carry no extractable intelligence — GPT hallucinates
# units and tags from thin context (e.g. "Main Lot", "10-4", "David").
if len(transcript.split()) <= 5:
logger.info(
f"Intelligence: call {call_id} — transcript too short for extraction "
f"({len(transcript.split())} words), skipping"
)
return []
raw_scenes: list[dict] = await asyncio.to_thread(
_sync_extract,
transcript, talkgroup_name, talkgroup_id, system_id, segments, vocabulary, ten_codes,
@@ -203,11 +213,14 @@ async def extract_scenes(
# This prevents generic street names from resolving to wrong-country results.
location_coords: Optional[dict] = None
if location and node_lat is not None and node_lon is not None:
muni = _municipality_from_tg(talkgroup_name)
state = await _get_node_state(node_id or "", node_lat, node_lon) if node_id else ""
parts = [location]
muni = _municipality_from_tg(talkgroup_name)
state = await _get_node_state(node_id or "", node_lat, node_lon) if node_id else ""
county = _node_county_cache.get(node_id or "") if node_id else ""
parts = [location]
if muni:
parts.append(muni)
if county:
parts.append(county)
if state:
parts.append(state)
query = ", ".join(parts)
@@ -288,6 +301,7 @@ def _geo_dist_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
async def _get_node_state(node_id: str, lat: float, lon: float) -> str:
"""
Return the US state name (e.g. "New York") for a node's position.
Also populates _node_county_cache as a side-effect (same API call).
Uses Google Maps Reverse Geocoding; cached for the process lifetime since nodes don't move.
"""
if node_id in _node_state_cache:
@@ -300,29 +314,36 @@ async def _get_node_state(node_id: str, lat: float, lon: float) -> str:
return ""
state = ""
county = ""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(
"https://maps.googleapis.com/maps/api/geocode/json",
params={
"latlng": f"{lat},{lon}",
"result_type": "administrative_area_level_1",
"key": settings.google_maps_api_key,
"latlng": f"{lat},{lon}",
"result_type": "administrative_area_level_1|administrative_area_level_2",
"key": settings.google_maps_api_key,
},
)
r.raise_for_status()
data = r.json()
if data.get("status") == "OK" and data.get("results"):
for comp in data["results"][0].get("address_components", []):
if "administrative_area_level_1" in comp.get("types", []):
state = comp.get("long_name", "")
break
for result in data["results"]:
for comp in result.get("address_components", []):
types = comp.get("types", [])
if "administrative_area_level_1" in types and not state:
state = comp.get("long_name", "")
if "administrative_area_level_2" in types and not county:
county = comp.get("long_name", "")
except Exception as e:
logger.warning(f"Node state lookup failed for {node_id}: {e}")
if state:
_node_state_cache[node_id] = state
logger.info(f"Node {node_id} state resolved: {state!r}")
if county:
_node_county_cache[node_id] = county
if state or county:
logger.info(f"Node {node_id} geo resolved: county={county!r} state={state!r}")
return state
+6 -2
View File
@@ -112,7 +112,8 @@ async def _run_extraction_pipeline(
all_tags.extend(scene["tags"])
# When dispatch is pulling a unit to a NEW call (reassignment), suppress unit
# overlap so the new scene doesn't chain into the unit's previous incident.
corr_units = [] if scene.get("reassignment") else scene.get("units")
is_reassignment = bool(scene.get("reassignment"))
corr_units = [] if is_reassignment else scene.get("units")
incident_id = await incident_correlator.correlate_call(
call_id=call_id,
node_id=node_id,
@@ -126,6 +127,7 @@ async def _run_extraction_pipeline(
units=corr_units,
vehicles=scene.get("vehicles"),
cleared_units=scene.get("cleared_units"),
reassignment=is_reassignment,
)
if incident_id and incident_id not in incident_ids:
incident_ids.append(incident_id)
@@ -213,7 +215,8 @@ async def _run_intelligence_pipeline(
if flags["correlation_enabled"]:
for scene in scenes:
all_tags.extend(scene["tags"])
corr_units = [] if scene.get("reassignment") else scene.get("units")
is_reassignment = bool(scene.get("reassignment"))
corr_units = [] if is_reassignment else scene.get("units")
incident_id = await incident_correlator.correlate_call(
call_id=call_id,
node_id=node_id,
@@ -227,6 +230,7 @@ async def _run_intelligence_pipeline(
units=corr_units,
vehicles=scene.get("vehicles"),
cleared_units=scene.get("cleared_units"),
reassignment=is_reassignment,
)
if incident_id and incident_id not in incident_ids:
incident_ids.append(incident_id)