diff --git a/drb-c2-core/app/config.py b/drb-c2-core/app/config.py index 841dafd..1bd5ada 100644 --- a/drb-c2-core/app/config.py +++ b/drb-c2-core/app/config.py @@ -27,6 +27,7 @@ class Settings(BaseSettings): embedding_similarity_threshold: float = 0.93 # slow-path cosine threshold (tiebreaker only) location_proximity_km: float = 0.5 # radius for location-proximity matching incident_auto_resolve_minutes: int = 90 # auto-resolve after N minutes with no new calls + recorrelation_scan_minutes: int = 15 # re-examine orphaned calls ended within this window # Vocabulary learning vocabulary_induction_interval_hours: int = 24 # how often the induction loop runs diff --git a/drb-c2-core/app/internal/firestore.py b/drb-c2-core/app/internal/firestore.py index 66e26ee..7279b11 100644 --- a/drb-c2-core/app/internal/firestore.py +++ b/drb-c2-core/app/internal/firestore.py @@ -57,6 +57,24 @@ async def collection_list(collection: str, **filters) -> list[dict]: return await asyncio.to_thread(_query) +async def collection_where( + collection: str, + conditions: list[tuple[str, str, Any]], +) -> list[dict]: + """ + Query a collection with arbitrary where-clauses. + conditions: list of (field, op, value) — e.g. [("ended_at", ">=", cutoff_dt)] + Supports any Firestore operator: "==", "!=", "<", "<=", ">", ">=". + """ + def _query(): + ref = db.collection(collection) + for field, op, value in conditions: + ref = ref.where(field, op, value) + return [doc.to_dict() for doc in ref.stream()] + + return await asyncio.to_thread(_query) + + async def doc_delete(collection: str, doc_id: str) -> None: ref = db.collection(collection).document(doc_id) await asyncio.to_thread(ref.delete) diff --git a/drb-c2-core/app/internal/incident_correlator.py b/drb-c2-core/app/internal/incident_correlator.py index bef09f7..6164ed7 100644 --- a/drb-c2-core/app/internal/incident_correlator.py +++ b/drb-c2-core/app/internal/incident_correlator.py @@ -46,17 +46,27 @@ async def correlate_call( incident_type: Optional[str], location: Optional[str] = None, location_coords: Optional[dict] = None, + reference_time: Optional[datetime] = None, + create_if_new: bool = True, ) -> Optional[str]: """ Link call_id to an existing incident or create a new one. - Returns the incident_id, or None if skipped (no type and no talkgroup match). + + reference_time — time anchor for the time-limited paths (location + slow). + Defaults to now. Pass call.started_at when re-correlating + orphaned calls so the window is anchored to when the call + actually happened, not when the sweep runs. + create_if_new — when False, skip new-incident creation (re-correlation only + links to existing incidents; it never creates new ones). + + Returns the incident_id, or None if skipped. """ - now = datetime.now(timezone.utc) + now = reference_time or datetime.now(timezone.utc) window = timedelta(hours=settings.correlation_window_hours) # Fetch all active incidents cross-type (mutual aid needs this) all_active = await fstore.collection_list("incidents", status="active") - recent = [inc for inc in all_active if _within_window(inc, now, window)] + recent = [inc for inc in all_active if _within_window_of(inc, now, window)] # Fetch call doc once — reused for disambiguation, embedding merge, unit accumulation call_doc = await fstore.doc_get("calls", call_id) or {} @@ -147,14 +157,14 @@ async def correlate_call( location, location_coords, call_units, call_vehicles, call_embedding, now, talkgroup_name=talkgroup_name, incident_type=incident_type, ) - elif incident_type: + elif incident_type and create_if_new: incident_id = await _create_incident( call_id, incident_type, talkgroup_id, talkgroup_name, system_id, tags, location, location_coords, call_units, call_vehicles, call_embedding, call_severity, now, ) else: - # Unclassified call, no talkgroup match found — nothing to do + # No match and either no type or creation suppressed — nothing to do return None await fstore.doc_set("calls", call_id, {"incident_id": incident_id}) @@ -165,14 +175,19 @@ async def correlate_call( # Internal helpers # ───────────────────────────────────────────────────────────────────────────── -def _within_window(inc: dict, now: datetime, window: timedelta) -> bool: +def _within_window_of(inc: dict, anchor: datetime, window: timedelta) -> bool: + """ + True if the incident's started_at is within `window` of `anchor` in either + direction. Using started_at (not updated_at) means re-correlation anchored + to a call's started_at correctly matches incidents created shortly *after* + that call (e.g. a welfare check at T+0 vs. an incident created at T+15m). + """ try: - dt = datetime.fromisoformat( - str(inc.get("updated_at", "")).replace("Z", "+00:00") - ) + raw = inc.get("started_at") or inc.get("updated_at") or "" + dt = datetime.fromisoformat(str(raw).replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) - return (now - dt) <= window + return abs((anchor - dt).total_seconds()) <= window.total_seconds() except Exception: return False diff --git a/drb-c2-core/app/internal/recorrelation_sweep.py b/drb-c2-core/app/internal/recorrelation_sweep.py new file mode 100644 index 0000000..25ad8e9 --- /dev/null +++ b/drb-c2-core/app/internal/recorrelation_sweep.py @@ -0,0 +1,108 @@ +""" +Re-correlation sweep. + +Runs every summary_interval_minutes (same tick as the summarizer). Each pass +finds calls that are: + - recently ended (ended_at within the last recorrelation_scan_minutes) + - still orphaned (incident_id is null) + +and re-runs the incident correlator against currently-active incidents, using +the call's own started_at as the time anchor so the window is correct regardless +of when the sweep fires. + +Never creates new incidents — link-only. Zero LLM tokens (uses pre-computed +talkgroup strings, haversine math, and stored embeddings). +""" +import asyncio +from datetime import datetime, timezone, timedelta +from typing import Optional +from app.internal.logger import logger +from app.internal import firestore as fstore +from app.config import settings + + +async def recorrelation_loop() -> None: + interval = settings.summary_interval_minutes * 60 + logger.info( + f"Re-correlation sweep started — " + f"interval: {settings.summary_interval_minutes}m, " + f"scan window: {settings.recorrelation_scan_minutes}m" + ) + while True: + await asyncio.sleep(interval) + try: + await _run_sweep_pass() + except Exception as e: + logger.error(f"Re-correlation sweep failed: {e}") + + +async def _run_sweep_pass() -> None: + cutoff = datetime.now(timezone.utc) - timedelta(minutes=settings.recorrelation_scan_minutes) + + # Server-side range query: only calls that ended within the scan window. + # Filter incident_id=null client-side (Firestore can't query for missing fields). + # This keeps the fetched set small regardless of total collection size. + recent_ended = await fstore.collection_where("calls", [ + ("status", "==", "ended"), + ("ended_at", ">=", cutoff), + ]) + orphans = [c for c in recent_ended if not c.get("incident_id")] + + if not orphans: + return + + logger.info(f"Re-correlation sweep: {len(orphans)} orphaned call(s) to check") + linked = 0 + for call in orphans: + if await _recorrelate_orphan(call): + linked += 1 + + if linked: + logger.info(f"Re-correlation sweep: linked {linked}/{len(orphans)} orphaned call(s)") + + +async def _recorrelate_orphan(call: dict) -> bool: + """ + Attempt to link a single orphaned call to an existing incident. + Returns True if a match was found and the call was linked. + """ + from app.internal import incident_correlator + + call_id = call.get("call_id") + started_at = _parse_dt(call.get("started_at")) + if not call_id or not started_at: + return False + + # All data needed for correlation was stored by the first-pass extraction. + incident_id = await incident_correlator.correlate_call( + call_id = call_id, + node_id = call.get("node_id", ""), + system_id = call.get("system_id"), + talkgroup_id = call.get("talkgroup_id"), + talkgroup_name = call.get("talkgroup_name"), + tags = call.get("tags") or [], + incident_type = call.get("incident_type"), + location = call.get("location"), + location_coords= call.get("location_coords"), + reference_time = started_at, # anchor window to when the call happened + create_if_new = False, # never create — link-only + ) + + if incident_id: + logger.info( + f"Re-correlation: linked orphaned call {call_id} → incident {incident_id}" + ) + return True + return False + + +def _parse_dt(value) -> Optional[datetime]: + if not value: + return None + try: + dt = datetime.fromisoformat(str(value).replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except Exception: + return None diff --git a/drb-c2-core/app/main.py b/drb-c2-core/app/main.py index acd164a..37471d6 100644 --- a/drb-c2-core/app/main.py +++ b/drb-c2-core/app/main.py @@ -7,6 +7,7 @@ from app.internal.mqtt_handler import mqtt_handler from app.internal.node_sweeper import sweeper_loop from app.internal.summarizer import summarizer_loop from app.internal.vocabulary_learner import vocabulary_induction_loop +from app.internal.recorrelation_sweep import recorrelation_loop from app.config import settings from app.internal.auth import require_firebase_token, require_service_or_firebase_token from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts @@ -36,9 +37,10 @@ async def lifespan(app: FastAPI): await _release_orphaned_tokens() await mqtt_handler.connect() - sweeper_task = asyncio.create_task(sweeper_loop()) - summarizer_task = asyncio.create_task(summarizer_loop()) - induction_task = asyncio.create_task(vocabulary_induction_loop()) + sweeper_task = asyncio.create_task(sweeper_loop()) + summarizer_task = asyncio.create_task(summarizer_loop()) + induction_task = asyncio.create_task(vocabulary_induction_loop()) + recorrelation_task = asyncio.create_task(recorrelation_loop()) yield # --- app running --- @@ -46,6 +48,7 @@ async def lifespan(app: FastAPI): sweeper_task.cancel() summarizer_task.cancel() induction_task.cancel() + recorrelation_task.cancel() await mqtt_handler.disconnect()