Files
server-26/drb-c2-core/app/internal/recorrelation_sweep.py
T
2026-04-23 01:26:41 -04:00

113 lines
3.8 KiB
Python

"""
Re-correlation sweep.
Runs every summary_interval_minutes (same tick as the summarizer). Each pass
finds calls that are:
- recently ended (ended_at within the last recorrelation_scan_minutes)
- still orphaned (incident_id is null)
and re-runs the incident correlator against currently-active incidents, using
the call's own started_at as the time anchor so the window is correct regardless
of when the sweep fires.
Never creates new incidents — link-only. Zero LLM tokens (uses pre-computed
talkgroup strings, haversine math, and stored embeddings).
"""
import asyncio
from datetime import datetime, timezone, timedelta
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
from app.config import settings
async def recorrelation_loop() -> None:
interval = settings.summary_interval_minutes * 60
logger.info(
f"Re-correlation sweep started — "
f"interval: {settings.summary_interval_minutes}m, "
f"scan window: {settings.recorrelation_scan_minutes}m"
)
while True:
await asyncio.sleep(interval)
try:
await _run_sweep_pass()
except Exception as e:
logger.error(f"Re-correlation sweep failed: {e}")
async def _run_sweep_pass() -> None:
cutoff = datetime.now(timezone.utc) - timedelta(minutes=settings.recorrelation_scan_minutes)
# Server-side range query: only calls that ended within the scan window.
# Filter incident_id=null client-side (Firestore can't query for missing fields).
# This keeps the fetched set small regardless of total collection size.
recent_ended = await fstore.collection_where("calls", [
("status", "==", "ended"),
("ended_at", ">=", cutoff),
])
orphans = [
c for c in recent_ended
if not c.get("incident_ids") and not c.get("incident_id")
]
if not orphans:
return
logger.info(f"Re-correlation sweep: {len(orphans)} orphaned call(s) to check")
linked = 0
for call in orphans:
if await _recorrelate_orphan(call):
linked += 1
if linked:
logger.info(f"Re-correlation sweep: linked {linked}/{len(orphans)} orphaned call(s)")
async def _recorrelate_orphan(call: dict) -> bool:
"""
Attempt to link a single orphaned call to an existing incident.
Returns True if a match was found and the call was linked.
"""
from app.internal import incident_correlator
call_id = call.get("call_id")
started_at = _parse_dt(call.get("started_at"))
if not call_id or not started_at:
return False
# All data needed for correlation was stored by the first-pass extraction.
incident_id = await incident_correlator.correlate_call(
call_id = call_id,
node_id = call.get("node_id", ""),
system_id = call.get("system_id"),
talkgroup_id = call.get("talkgroup_id"),
talkgroup_name = call.get("talkgroup_name"),
tags = call.get("tags") or [],
incident_type = call.get("incident_type"),
location = call.get("location"),
location_coords= call.get("location_coords"),
reference_time = started_at, # anchor window to when the call happened
create_if_new = False, # never create — link-only
)
if incident_id:
await fstore.doc_set("calls", call_id, {"incident_ids": [incident_id]})
logger.info(
f"Re-correlation: linked orphaned call {call_id} → incident {incident_id}"
)
return True
return False
def _parse_dt(value) -> Optional[datetime]:
if not value:
return None
try:
dt = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
return None