Updates, big updates

incident_correlator.py — full rewrite: always runs on every call, fetches all active incidents cross-type, fast path collects all talkgroup matches and disambiguates by unit/vehicle overlap → location proximity → embedding, new location proximity path, slow path requires location corroboration, "Auto:" stripped from titles, "auto-generated" tag added, units/vehicles now accumulated on update
intelligence.py — resolved field in GPT schema, returned as 5th value
upload.py — both pipelines unpack 5-tuple, always call correlate, auto-resolve on resolved=True
summarizer.py — stale sweep runs each tick, resolves incidents idle for 90+ minutes
config.py — correlation_window_hours=2, embedding_similarity_threshold=0.93, location_proximity_km=0.5, incident_auto_resolve_minutes=90
This commit is contained in:
Logan
2026-04-19 22:53:53 -04:00
parent f9d4fcbc39
commit ba43796c51
9 changed files with 539 additions and 293 deletions
+253 -143
View File
@@ -1,32 +1,29 @@
"""
Hybrid incident correlation engine.
Fast path — deterministic:
Same system_id + talkgroup_id + incident_type within the correlation window.
This is the primary signal for P25 — dispatch assigns one talkgroup per incident.
Matching priority (in order):
Slow path embedding similarity:
If fast path finds nothing, compare the new call's embedding against the
centroid embedding of each open incident (running average of its call embeddings).
Match if cosine similarity >= threshold.
1. Fast path — talkgroup + system match (any incident type, no time limit)
Active-status gate is sufficient. If multiple active incidents share the same
talkgroup (e.g. busy shared channel), disambiguate by:
a) Unit overlap — strongest signal, officer assigned to incident
b) Vehicle overlap — vehicle description shared across calls
c) Location proximity — geocoded coords closer to which incident
d) Embedding similarity against each candidate's centroid (tiebreaker)
Falls back to most-recently-updated on tie.
Background re-evaluation (driven by summarizer loop):
Unmatched calls are re-checked periodically, catching mutual-aid cases where
a second talkgroup gets pulled into an existing incident.
2. Location path — geocoded coords within `location_proximity_km` (time-limited)
Primary mutual-aid signal: EMS + police at the same scene.
Incident document schema additions:
talkgroup_ids: list[str] — all talkgroups that have contributed calls
location_mentions: list[str] — all location strings from calls (newest last)
location: str|None — best known location (newest non-null for now;
TODO: replace with Maps geocoding bbox comparison)
vehicles: list[str] — deduplicated vehicle list across all calls
units: list[str] — deduplicated unit list across all calls
severity: str — highest severity seen
summary_stale: bool — True when a new call is added
summary_last_run: str|None — ISO timestamp of last Gemini summary run
embedding: list[float] — running-average centroid of call embeddings
embedding_count: int — number of calls factored into the centroid
3. Slow path — embedding cosine similarity (time-limited, same type only)
Requires similarity >= threshold AND location within 4× proximity radius.
Never fires alone — location corroboration is mandatory.
Calls with no incident_type skip new-incident creation but still run paths 13,
so unclassified calls (short transport end, "en route", etc.) can link to an
existing incident via talkgroup match.
"""
import math
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional
@@ -35,6 +32,10 @@ from app.internal import firestore as fstore
from app.config import settings
# ─────────────────────────────────────────────────────────────────────────────
# Public entry point
# ─────────────────────────────────────────────────────────────────────────────
async def correlate_call(
call_id: str,
node_id: str,
@@ -48,83 +49,195 @@ async def correlate_call(
) -> Optional[str]:
"""
Link call_id to an existing incident or create a new one.
Returns the incident_id that was linked, or None if skipped.
Returns the incident_id, or None if skipped (no type and no talkgroup match).
"""
if not incident_type:
return None
now = datetime.now(timezone.utc)
now = datetime.now(timezone.utc)
window = timedelta(hours=settings.correlation_window_hours)
# Fetch all active incidents of the same type
candidates = await fstore.collection_list("incidents", status="active", type=incident_type)
# Fetch all active incidents cross-type (mutual aid needs this)
all_active = await fstore.collection_list("incidents", status="active")
recent = [inc for inc in all_active if _within_window(inc, now, window)]
# Filter to those within the correlation window
active = []
for inc in candidates:
updated_raw = inc.get("updated_at", "")
try:
updated_dt = datetime.fromisoformat(str(updated_raw).replace("Z", "+00:00"))
if updated_dt.tzinfo is None:
updated_dt = updated_dt.replace(tzinfo=timezone.utc)
if (now - updated_dt) <= window:
active.append(inc)
except Exception:
continue
# Fetch call doc once — reused for disambiguation, embedding merge, unit accumulation
call_doc = await fstore.doc_get("calls", call_id) or {}
call_embedding: Optional[list] = call_doc.get("embedding")
call_units: list[str] = call_doc.get("units") or []
call_vehicles: list[str] = call_doc.get("vehicles") or []
call_severity: str = call_doc.get("severity") or "unknown"
# Use passed coords first (freshly geocoded), fall back to what's on the call doc
coords: Optional[dict] = location_coords or call_doc.get("location_coords")
# ----------------------------------------------------------------
# Fast path — talkgroup match
# ----------------------------------------------------------------
matched_incident: Optional[dict] = None
# ── 1. Fast path: talkgroup match (any type, no time limit) ──────────────
if talkgroup_id is not None and system_id:
tg_str = str(talkgroup_id)
for inc in active:
if system_id in (inc.get("system_ids") or []) and tg_str in (inc.get("talkgroup_ids") or []):
tg_str = str(talkgroup_id)
tg_matches = [
inc for inc in all_active
if system_id in (inc.get("system_ids") or [])
and tg_str in (inc.get("talkgroup_ids") or [])
]
if len(tg_matches) == 1:
matched_incident = tg_matches[0]
logger.info(
f"Correlator fast-path: call {call_id}{tg_matches[0]['incident_id']}"
)
elif len(tg_matches) > 1:
matched_incident = _disambiguate(
tg_matches, call_units, call_vehicles, coords, call_embedding
)
logger.info(
f"Correlator fast-path (disambig {len(tg_matches)} candidates): "
f"call {call_id}{matched_incident['incident_id']}"
)
# ── 2. Location path: proximity match (time-limited, cross-type) ─────────
if not matched_incident and coords:
for inc in recent:
inc_coords = inc.get("location_coords")
if not inc_coords:
continue
dist_km = _haversine_km(
coords["lat"], coords["lng"],
inc_coords["lat"], inc_coords["lng"],
)
if dist_km <= settings.location_proximity_km:
matched_incident = inc
logger.info(f"Correlator fast-path: call {call_id} → incident {inc['incident_id']} (tg match)")
logger.info(
f"Correlator location-path: call {call_id}{inc['incident_id']} "
f"(dist={dist_km:.2f}km)"
)
break
# ----------------------------------------------------------------
# Slow path — embedding similarity
# ----------------------------------------------------------------
if not matched_incident and active:
call_doc = await fstore.doc_get("calls", call_id)
call_embedding = call_doc.get("embedding") if call_doc else None
if call_embedding:
best_score = 0.0
best_inc = None
for inc in active:
inc_embedding = inc.get("embedding")
if not inc_embedding:
continue
score = _cosine_similarity(call_embedding, inc_embedding)
if score > best_score:
best_score = score
best_inc = inc
if best_inc and best_score >= settings.embedding_similarity_threshold:
matched_incident = best_inc
logger.info(
f"Correlator slow-path: call {call_id} → incident {best_inc['incident_id']} "
f"(similarity={best_score:.3f})"
)
# ── 3. Slow path: embedding + location corroboration (time-limited, same type) ──
if not matched_incident and call_embedding and incident_type:
best_score = 0.0
best_inc: Optional[dict] = None
for inc in recent:
if inc.get("type") != incident_type:
continue
inc_embedding = inc.get("embedding")
if not inc_embedding:
continue
sim = _cosine_similarity(call_embedding, inc_embedding)
if sim > best_score:
best_score = sim
best_inc = inc
# ----------------------------------------------------------------
# Update existing or create new
# ----------------------------------------------------------------
if best_inc and best_score >= settings.embedding_similarity_threshold:
inc_coords = best_inc.get("location_coords")
if coords and inc_coords:
dist_km = _haversine_km(
coords["lat"], coords["lng"],
inc_coords["lat"], inc_coords["lng"],
)
if dist_km <= settings.location_proximity_km * 4:
matched_incident = best_inc
logger.info(
f"Correlator slow-path: call {call_id}{best_inc['incident_id']} "
f"(sim={best_score:.3f}, dist={dist_km:.2f}km)"
)
# No coords available → slow path alone is not enough; skip
# ── Update existing or create new ────────────────────────────────────────
if matched_incident:
incident_id = matched_incident["incident_id"]
await _update_incident(matched_incident, call_id, talkgroup_id, system_id, tags, location, location_coords, now)
else:
incident_id = await _create_incident(
call_id, incident_type, talkgroup_id, talkgroup_name, system_id, tags, location, location_coords, now
await _update_incident(
matched_incident, call_id, talkgroup_id, system_id, tags,
location, location_coords, call_units, call_vehicles, call_embedding, now,
)
elif incident_type:
incident_id = await _create_incident(
call_id, incident_type, talkgroup_id, talkgroup_name, system_id,
tags, location, location_coords,
call_units, call_vehicles, call_embedding, call_severity, now,
)
else:
# Unclassified call, no talkgroup match found — nothing to do
return None
# Back-link the call
await fstore.doc_set("calls", call_id, {"incident_id": incident_id})
return incident_id
# ─────────────────────────────────────────────────────────────────────────────
# Internal helpers
# ─────────────────────────────────────────────────────────────────────────────
def _within_window(inc: dict, now: datetime, window: timedelta) -> bool:
try:
dt = datetime.fromisoformat(
str(inc.get("updated_at", "")).replace("Z", "+00:00")
)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return (now - dt) <= window
except Exception:
return False
def _haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
R = 6371.0
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (
math.sin(dlat / 2) ** 2
+ math.cos(math.radians(lat1))
* math.cos(math.radians(lat2))
* math.sin(dlon / 2) ** 2
)
return R * 2 * math.asin(math.sqrt(a))
def _disambiguate(
candidates: list[dict],
call_units: list[str],
call_vehicles: list[str],
call_coords: Optional[dict],
call_embedding: Optional[list],
) -> dict:
"""
Score each talkgroup-matched candidate and return the best.
Signals (descending weight): unit overlap, vehicle overlap,
location proximity, embedding similarity.
Ties broken by most-recently-updated.
"""
best = candidates[0]
best_score = -1.0
for inc in candidates:
score = 0.0
inc_units = set(inc.get("units") or [])
if inc_units and call_units and any(u in inc_units for u in call_units):
score += 10.0
inc_vehicles = set(inc.get("vehicles") or [])
if inc_vehicles and call_vehicles and any(v in inc_vehicles for v in call_vehicles):
score += 8.0
inc_coords = inc.get("location_coords")
if inc_coords and call_coords:
dist = _haversine_km(
call_coords["lat"], call_coords["lng"],
inc_coords["lat"], inc_coords["lng"],
)
score += 6.0 if dist < 1.0 else (2.0 if dist < 5.0 else 0.0)
inc_emb = inc.get("embedding")
if inc_emb and call_embedding:
score += _cosine_similarity(call_embedding, inc_emb) * 3.0
if score > best_score or (
score == best_score
and inc.get("updated_at", "") > best.get("updated_at", "")
):
best = inc
best_score = score
return best
async def _update_incident(
inc: dict,
call_id: str,
@@ -133,43 +246,48 @@ async def _update_incident(
tags: list[str],
location: Optional[str],
location_coords: Optional[dict],
call_units: list[str],
call_vehicles: list[str],
call_embedding: Optional[list],
now: datetime,
) -> None:
incident_id = inc["incident_id"]
call_ids = inc.get("call_ids", [])
call_ids = list(inc.get("call_ids") or [])
if call_id not in call_ids:
call_ids.append(call_id)
talkgroup_ids = inc.get("talkgroup_ids", [])
talkgroup_ids = list(inc.get("talkgroup_ids") or [])
if talkgroup_id is not None and str(talkgroup_id) not in talkgroup_ids:
talkgroup_ids.append(str(talkgroup_id))
system_ids = inc.get("system_ids", [])
system_ids = list(inc.get("system_ids") or [])
if system_id and system_id not in system_ids:
system_ids.append(system_id)
# Merge tags (deduplicated)
merged_tags = list(dict.fromkeys(inc.get("tags", []) + tags))
merged_tags = list(dict.fromkeys((inc.get("tags") or []) + tags))
merged_units = list(dict.fromkeys((inc.get("units") or []) + call_units))
merged_vehicles = list(dict.fromkeys((inc.get("vehicles") or []) + call_vehicles))
# Location — append to mentions; update display location if new one is non-null
location_mentions = inc.get("location_mentions", [])
location_mentions = list(inc.get("location_mentions") or [])
if location and location not in location_mentions:
location_mentions.append(location)
best_location = location if location else inc.get("location")
best_coords = location_coords if location_coords else inc.get("location_coords")
# Update centroid embedding
embedding_updates = await _merge_embedding(inc, call_id)
best_location = location or inc.get("location")
best_coords = location_coords or inc.get("location_coords")
updates = {
"call_ids": call_ids,
"talkgroup_ids": talkgroup_ids,
"system_ids": system_ids,
"tags": merged_tags,
embedding_updates = _merge_embedding_vecs(inc, call_embedding) if call_embedding else {}
updates: dict = {
"call_ids": call_ids,
"talkgroup_ids": talkgroup_ids,
"system_ids": system_ids,
"tags": merged_tags,
"units": merged_units,
"vehicles": merged_vehicles,
"location_mentions": location_mentions,
"updated_at": now.isoformat(),
"summary_stale": True,
"updated_at": now.isoformat(),
"summary_stale": True,
**embedding_updates,
}
if best_location:
@@ -178,7 +296,7 @@ async def _update_incident(
updates["location_coords"] = best_coords
await fstore.doc_set("incidents", incident_id, updates)
logger.info(f"Correlator: linked call {call_id} to existing incident {incident_id}")
logger.info(f"Correlator: linked call {call_id} to incident {incident_id}")
async def _create_incident(
@@ -190,72 +308,64 @@ async def _create_incident(
tags: list[str],
location: Optional[str],
location_coords: Optional[dict],
call_units: list[str],
call_vehicles: list[str],
call_embedding: Optional[list],
call_severity: str,
now: datetime,
) -> str:
incident_id = str(uuid.uuid4())
tg_label = talkgroup_name or (f"TGID {talkgroup_id}" if talkgroup_id else "Unknown Talkgroup")
call_doc = await fstore.doc_get("calls", call_id)
call_embedding = call_doc.get("embedding") if call_doc else None
call_vehicles = call_doc.get("vehicles", []) if call_doc else []
call_units = call_doc.get("units", []) if call_doc else []
call_severity = call_doc.get("severity", "unknown") if call_doc else "unknown"
tg_label = (
talkgroup_name
or (f"TGID {talkgroup_id}" if talkgroup_id else "Unknown Talkgroup")
)
doc = {
"incident_id": incident_id,
"title": f"Auto: {incident_type.title()}{tg_label}",
"type": incident_type,
"status": "active",
"location": location,
"location_coords": location_coords,
"incident_id": incident_id,
"title": f"{incident_type.title()}{tg_label}",
"type": incident_type,
"status": "active",
"location": location,
"location_coords": location_coords,
"location_mentions": [location] if location else [],
"call_ids": [call_id],
"talkgroup_ids": [str(talkgroup_id)] if talkgroup_id is not None else [],
"system_ids": [system_id] if system_id else [],
"tags": tags,
"vehicles": call_vehicles,
"units": call_units,
"severity": call_severity,
"summary": None,
"summary_stale": True,
"summary_last_run": None,
"embedding": call_embedding,
"embedding_count": 1 if call_embedding else 0,
"started_at": now.isoformat(),
"updated_at": now.isoformat(),
"call_ids": [call_id],
"talkgroup_ids": [str(talkgroup_id)] if talkgroup_id is not None else [],
"system_ids": [system_id] if system_id else [],
"tags": tags + ["auto-generated"],
"units": call_units,
"vehicles": call_vehicles,
"severity": call_severity,
"summary": None,
"summary_stale": True,
"summary_last_run": None,
"embedding": call_embedding,
"embedding_count": 1 if call_embedding else 0,
"started_at": now.isoformat(),
"updated_at": now.isoformat(),
}
await fstore.doc_set("incidents", incident_id, doc, merge=False)
logger.info(f"Correlator: created incident {incident_id} for call {call_id} ({incident_type})")
logger.info(
f"Correlator: created incident {incident_id} for call {call_id} ({incident_type})"
)
return incident_id
async def _merge_embedding(inc: dict, call_id: str) -> dict:
"""
Update the incident's centroid embedding with the new call's embedding.
Uses an online running-average: new_avg = (old_avg * n + new_vec) / (n + 1)
"""
def _merge_embedding_vecs(inc: dict, call_embedding: list[float]) -> dict:
"""Online running-average centroid: new_avg = (old_avg * n + new_vec) / (n+1)"""
import numpy as np
call_doc = await fstore.doc_get("calls", call_id)
call_embedding = call_doc.get("embedding") if call_doc else None
if not call_embedding:
return {}
n = inc.get("embedding_count", 0)
n = inc.get("embedding_count") or 0
old_embedding = inc.get("embedding")
if old_embedding and n > 0:
old_vec = np.array(old_embedding, dtype=float)
new_vec = np.array(call_embedding, dtype=float)
updated = ((old_vec * n) + new_vec) / (n + 1)
return {"embedding": updated.tolist(), "embedding_count": n + 1}
else:
return {"embedding": call_embedding, "embedding_count": 1}
return {"embedding": call_embedding, "embedding_count": 1}
def _cosine_similarity(a: list[float], b: list[float]) -> float:
import numpy as np
va, vb = np.array(a, dtype=float), np.array(b, dtype=float)
va, vb = np.array(a, dtype=float), np.array(b, dtype=float)
norm_a, norm_b = np.linalg.norm(va), np.linalg.norm(vb)
if norm_a == 0 or norm_b == 0:
return 0.0
+8 -5
View File
@@ -22,6 +22,7 @@ Schema:
"vehicles": [vehicle descriptions mentioned, e.g. "Hyundai Tucson", "black sedan"],
"units": [unit IDs or officer numbers mentioned, e.g. "Unit 511", "Car 4"],
"severity": one of "minor" | "moderate" | "major" | "unknown",
"resolved": true if this call explicitly signals the incident is over ("Code 4", "in custody", "all clear", "fire out", "patient transported", "GOA", "scene clear", "10-42", "negative contact", "clear the scene"), false otherwise,
"transcript_corrected": "corrected full transcript string, or null if no corrections needed"
}}
@@ -48,14 +49,15 @@ async def extract_tags(
system_id: Optional[str] = None,
segments: Optional[list[dict]] = None,
node_id: Optional[str] = None,
) -> tuple[list[str], Optional[str], Optional[str], Optional[dict]]:
) -> tuple[list[str], Optional[str], Optional[str], Optional[dict], bool]:
"""
Extract incident tags, type, location, and corrected transcript via GPT-4o mini.
Extract incident tags, type, location, corrected transcript, and closure signal via GPT-4o mini.
Geocodes the extracted location string via Nominatim using the node's position as bias.
Returns:
(tags, primary_type, location_str, location_coords)
where location_coords is {"lat": float, "lng": float} or None.
(tags, primary_type, location_str, location_coords, resolved)
where location_coords is {"lat": float, "lng": float} or None,
and resolved is True when the transcript signals incident closure.
Side-effect: updates calls/{call_id} in Firestore with tags, location,
location_coords, vehicles, units, severity, transcript_corrected; also stores embedding.
@@ -70,6 +72,7 @@ async def extract_tags(
vehicles: list[str] = result.get("vehicles") or []
units: list[str] = result.get("units") or []
severity: str = result.get("severity") or "unknown"
resolved: bool = bool(result.get("resolved", False))
transcript_corrected: Optional[str] = result.get("transcript_corrected") or None
if incident_type in ("unknown", "other", ""):
@@ -112,7 +115,7 @@ async def extract_tags(
f"tags={tags}, location={location!r}, coords={location_coords}, severity={severity}, "
f"corrected={transcript_corrected is not None}"
)
return tags, incident_type, location, location_coords
return tags, incident_type, location, location_coords, resolved
async def _geocode_location(
+42 -7
View File
@@ -1,15 +1,14 @@
"""
Background incident summary loop.
Runs every SUMMARY_INTERVAL_MINUTES. Finds all active incidents with
summary_stale=True, fetches all their call transcripts, and calls Gemini
once per incident to produce a concise factual summary.
By batching this way: Gemini is never called per-call — only periodically
and only for incidents that have actually changed since the last run.
Runs every SUMMARY_INTERVAL_MINUTES. Two passes per tick:
1. Summary pass — find stale incidents (summary_stale=True) and regenerate summaries.
2. Stale sweep — auto-resolve incidents with no new calls for incident_auto_resolve_minutes.
This is effectively "time since last call" because updated_at is stamped on every
new linked call.
"""
import asyncio
from datetime import datetime, timezone
from datetime import datetime, timezone, timedelta
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
@@ -23,6 +22,7 @@ async def summarizer_loop() -> None:
await asyncio.sleep(interval)
try:
await _run_summary_pass()
await _resolve_stale_incidents()
except Exception as e:
logger.error(f"Summarizer pass failed: {e}")
@@ -74,6 +74,41 @@ async def _summarize_incident(inc: dict) -> None:
await fstore.doc_set("incidents", incident_id, updates)
async def _resolve_stale_incidents() -> None:
"""Auto-resolve active incidents that have had no new calls for incident_auto_resolve_minutes."""
all_active = await fstore.collection_list("incidents", status="active")
if not all_active:
return
now = datetime.now(timezone.utc)
cutoff = timedelta(minutes=settings.incident_auto_resolve_minutes)
count = 0
for inc in all_active:
incident_id = inc.get("incident_id")
if not incident_id:
continue
try:
updated_dt = datetime.fromisoformat(
str(inc.get("updated_at", "")).replace("Z", "+00:00")
)
if updated_dt.tzinfo is None:
updated_dt = updated_dt.replace(tzinfo=timezone.utc)
idle_minutes = (now - updated_dt).total_seconds() / 60
if idle_minutes > settings.incident_auto_resolve_minutes:
await fstore.doc_set("incidents", incident_id, {"status": "resolved"})
logger.info(
f"Auto-resolved stale incident {incident_id} "
f"(idle {idle_minutes:.0f}m)"
)
count += 1
except Exception as e:
logger.warning(f"Stale sweep error for {incident_id}: {e}")
if count:
logger.info(f"Stale sweep: resolved {count} incident(s)")
def _sync_summarize(inc: dict, transcripts: list[str]) -> Optional[str]:
from app.config import settings
from openai import OpenAI