Implement recorrelation logic

This commit is contained in:
Logan
2026-04-21 22:19:57 -04:00
parent 338b946ba3
commit 65839a3191
5 changed files with 158 additions and 13 deletions
+1
View File
@@ -27,6 +27,7 @@ class Settings(BaseSettings):
embedding_similarity_threshold: float = 0.93 # slow-path cosine threshold (tiebreaker only)
location_proximity_km: float = 0.5 # radius for location-proximity matching
incident_auto_resolve_minutes: int = 90 # auto-resolve after N minutes with no new calls
recorrelation_scan_minutes: int = 15 # re-examine orphaned calls ended within this window
# Vocabulary learning
vocabulary_induction_interval_hours: int = 24 # how often the induction loop runs
+18
View File
@@ -57,6 +57,24 @@ async def collection_list(collection: str, **filters) -> list[dict]:
return await asyncio.to_thread(_query)
async def collection_where(
collection: str,
conditions: list[tuple[str, str, Any]],
) -> list[dict]:
"""
Query a collection with arbitrary where-clauses.
conditions: list of (field, op, value) — e.g. [("ended_at", ">=", cutoff_dt)]
Supports any Firestore operator: "==", "!=", "<", "<=", ">", ">=".
"""
def _query():
ref = db.collection(collection)
for field, op, value in conditions:
ref = ref.where(field, op, value)
return [doc.to_dict() for doc in ref.stream()]
return await asyncio.to_thread(_query)
async def doc_delete(collection: str, doc_id: str) -> None:
ref = db.collection(collection).document(doc_id)
await asyncio.to_thread(ref.delete)
+25 -10
View File
@@ -46,17 +46,27 @@ async def correlate_call(
incident_type: Optional[str],
location: Optional[str] = None,
location_coords: Optional[dict] = None,
reference_time: Optional[datetime] = None,
create_if_new: bool = True,
) -> Optional[str]:
"""
Link call_id to an existing incident or create a new one.
Returns the incident_id, or None if skipped (no type and no talkgroup match).
reference_time — time anchor for the time-limited paths (location + slow).
Defaults to now. Pass call.started_at when re-correlating
orphaned calls so the window is anchored to when the call
actually happened, not when the sweep runs.
create_if_new — when False, skip new-incident creation (re-correlation only
links to existing incidents; it never creates new ones).
Returns the incident_id, or None if skipped.
"""
now = datetime.now(timezone.utc)
now = reference_time or datetime.now(timezone.utc)
window = timedelta(hours=settings.correlation_window_hours)
# Fetch all active incidents cross-type (mutual aid needs this)
all_active = await fstore.collection_list("incidents", status="active")
recent = [inc for inc in all_active if _within_window(inc, now, window)]
recent = [inc for inc in all_active if _within_window_of(inc, now, window)]
# Fetch call doc once — reused for disambiguation, embedding merge, unit accumulation
call_doc = await fstore.doc_get("calls", call_id) or {}
@@ -147,14 +157,14 @@ async def correlate_call(
location, location_coords, call_units, call_vehicles, call_embedding, now,
talkgroup_name=talkgroup_name, incident_type=incident_type,
)
elif incident_type:
elif incident_type and create_if_new:
incident_id = await _create_incident(
call_id, incident_type, talkgroup_id, talkgroup_name, system_id,
tags, location, location_coords,
call_units, call_vehicles, call_embedding, call_severity, now,
)
else:
# Unclassified call, no talkgroup match found — nothing to do
# No match and either no type or creation suppressed — nothing to do
return None
await fstore.doc_set("calls", call_id, {"incident_id": incident_id})
@@ -165,14 +175,19 @@ async def correlate_call(
# Internal helpers
# ─────────────────────────────────────────────────────────────────────────────
def _within_window(inc: dict, now: datetime, window: timedelta) -> bool:
def _within_window_of(inc: dict, anchor: datetime, window: timedelta) -> bool:
"""
True if the incident's started_at is within `window` of `anchor` in either
direction. Using started_at (not updated_at) means re-correlation anchored
to a call's started_at correctly matches incidents created shortly *after*
that call (e.g. a welfare check at T+0 vs. an incident created at T+15m).
"""
try:
dt = datetime.fromisoformat(
str(inc.get("updated_at", "")).replace("Z", "+00:00")
)
raw = inc.get("started_at") or inc.get("updated_at") or ""
dt = datetime.fromisoformat(str(raw).replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return (now - dt) <= window
return abs((anchor - dt).total_seconds()) <= window.total_seconds()
except Exception:
return False
@@ -0,0 +1,108 @@
"""
Re-correlation sweep.
Runs every summary_interval_minutes (same tick as the summarizer). Each pass
finds calls that are:
- recently ended (ended_at within the last recorrelation_scan_minutes)
- still orphaned (incident_id is null)
and re-runs the incident correlator against currently-active incidents, using
the call's own started_at as the time anchor so the window is correct regardless
of when the sweep fires.
Never creates new incidents — link-only. Zero LLM tokens (uses pre-computed
talkgroup strings, haversine math, and stored embeddings).
"""
import asyncio
from datetime import datetime, timezone, timedelta
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
from app.config import settings
async def recorrelation_loop() -> None:
interval = settings.summary_interval_minutes * 60
logger.info(
f"Re-correlation sweep started — "
f"interval: {settings.summary_interval_minutes}m, "
f"scan window: {settings.recorrelation_scan_minutes}m"
)
while True:
await asyncio.sleep(interval)
try:
await _run_sweep_pass()
except Exception as e:
logger.error(f"Re-correlation sweep failed: {e}")
async def _run_sweep_pass() -> None:
cutoff = datetime.now(timezone.utc) - timedelta(minutes=settings.recorrelation_scan_minutes)
# Server-side range query: only calls that ended within the scan window.
# Filter incident_id=null client-side (Firestore can't query for missing fields).
# This keeps the fetched set small regardless of total collection size.
recent_ended = await fstore.collection_where("calls", [
("status", "==", "ended"),
("ended_at", ">=", cutoff),
])
orphans = [c for c in recent_ended if not c.get("incident_id")]
if not orphans:
return
logger.info(f"Re-correlation sweep: {len(orphans)} orphaned call(s) to check")
linked = 0
for call in orphans:
if await _recorrelate_orphan(call):
linked += 1
if linked:
logger.info(f"Re-correlation sweep: linked {linked}/{len(orphans)} orphaned call(s)")
async def _recorrelate_orphan(call: dict) -> bool:
"""
Attempt to link a single orphaned call to an existing incident.
Returns True if a match was found and the call was linked.
"""
from app.internal import incident_correlator
call_id = call.get("call_id")
started_at = _parse_dt(call.get("started_at"))
if not call_id or not started_at:
return False
# All data needed for correlation was stored by the first-pass extraction.
incident_id = await incident_correlator.correlate_call(
call_id = call_id,
node_id = call.get("node_id", ""),
system_id = call.get("system_id"),
talkgroup_id = call.get("talkgroup_id"),
talkgroup_name = call.get("talkgroup_name"),
tags = call.get("tags") or [],
incident_type = call.get("incident_type"),
location = call.get("location"),
location_coords= call.get("location_coords"),
reference_time = started_at, # anchor window to when the call happened
create_if_new = False, # never create — link-only
)
if incident_id:
logger.info(
f"Re-correlation: linked orphaned call {call_id} → incident {incident_id}"
)
return True
return False
def _parse_dt(value) -> Optional[datetime]:
if not value:
return None
try:
dt = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
return None
+6 -3
View File
@@ -7,6 +7,7 @@ from app.internal.mqtt_handler import mqtt_handler
from app.internal.node_sweeper import sweeper_loop
from app.internal.summarizer import summarizer_loop
from app.internal.vocabulary_learner import vocabulary_induction_loop
from app.internal.recorrelation_sweep import recorrelation_loop
from app.config import settings
from app.internal.auth import require_firebase_token, require_service_or_firebase_token
from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts
@@ -36,9 +37,10 @@ async def lifespan(app: FastAPI):
await _release_orphaned_tokens()
await mqtt_handler.connect()
sweeper_task = asyncio.create_task(sweeper_loop())
summarizer_task = asyncio.create_task(summarizer_loop())
induction_task = asyncio.create_task(vocabulary_induction_loop())
sweeper_task = asyncio.create_task(sweeper_loop())
summarizer_task = asyncio.create_task(summarizer_loop())
induction_task = asyncio.create_task(vocabulary_induction_loop())
recorrelation_task = asyncio.create_task(recorrelation_loop())
yield # --- app running ---
@@ -46,6 +48,7 @@ async def lifespan(app: FastAPI):
sweeper_task.cancel()
summarizer_task.cancel()
induction_task.cancel()
recorrelation_task.cancel()
await mqtt_handler.disconnect()