8b660d8e10
Correlator
- Raise fast-path idle gate 30 → 90 min (tg_fast_path_idle_minutes)
- Fix disambiguate always-commits bug: run _call_fits_incident on winner
before committing; fall through to new-incident creation if it fails
- Add unit-continuity path (path 1.5): matches all_active by shared unit
IDs with a reassignment guard, bridges calls past the idle gate
- Add tag-based incident_type inference (_TAG_TYPE_HINTS) as GPT fallback,
rescuing tagged calls that would have been dropped (616 observed orphans)
- Add master/child incident model: _create_master_incident, _demote_to_child,
_add_child_to_master; new incidents stamped incident_type="master"
- Add cross-system parent detection (_find_cross_system_parent): two-signal
scoring (road overlap=0.4, embedding≥0.78=0.3, proximity=0.3, threshold=0.5)
wired into create-if-new path; creates master shell on first cross-system match
- Add maybe_resolve_parent: auto-resolves master when all children close;
called from upload pipeline (LLM closure) and summarizer stale sweep
- Add signal-based auto-resolve via units_active/units_cleared tracking:
GPT now extracts cleared_units per scene; _update_incident moves units
between active/cleared lists and resolves the incident when active empties;
stored on call doc for re-correlation sweep reuse
- Add _create_incident initialization of units_active/units_cleared fields
Re-correlation sweep
- Add corr_sweep_count + MAX_SWEEP_ATTEMPTS=3: orphans get 3 attempts
then are tombstoned as corr_path="unlinked", ending the re-sweep loop
(previously hammering each orphan 29-31 times per shift)
Intelligence extraction
- Add cleared_units to GPT prompt schema and rules
- Extract and propagate cleared_units per scene; merge across scenes;
store on call doc for re-correlation sweep
Token management
- Fix token release bug: remove release_token call on discord_connected=False
in MQTT checkin (transient Discord drops were orphaning bots mid-shift)
- Add PUT /tokens/{id}/prefer/{system_id} endpoint: lock a bot token to a
system; pass _none as system_id to clear; stored bidirectionally on both
token and system documents
- discord_join handler resolves preferred_token_id from system doc and passes
system_name in MQTT payload
129 lines
4.7 KiB
Python
129 lines
4.7 KiB
Python
"""
|
||
Re-correlation sweep.
|
||
|
||
Runs every summary_interval_minutes (same tick as the summarizer). Each pass
|
||
finds calls that are:
|
||
- recently ended (ended_at within the last recorrelation_scan_minutes)
|
||
- still orphaned (incident_id is null)
|
||
|
||
and re-runs the incident correlator against currently-active incidents, using
|
||
the call's own started_at as the time anchor so the window is correct regardless
|
||
of when the sweep fires.
|
||
|
||
Never creates new incidents — link-only. Zero LLM tokens (uses pre-computed
|
||
talkgroup strings, haversine math, and stored embeddings).
|
||
"""
|
||
import asyncio
|
||
from datetime import datetime, timezone, timedelta
|
||
from typing import Optional
|
||
from app.internal.logger import logger
|
||
from app.internal import firestore as fstore
|
||
from app.config import settings
|
||
|
||
|
||
async def recorrelation_loop() -> None:
|
||
interval = settings.summary_interval_minutes * 60
|
||
logger.info(
|
||
f"Re-correlation sweep started — "
|
||
f"interval: {settings.summary_interval_minutes}m, "
|
||
f"scan window: {settings.recorrelation_scan_minutes}m"
|
||
)
|
||
while True:
|
||
await asyncio.sleep(interval)
|
||
try:
|
||
await _run_sweep_pass()
|
||
except Exception as e:
|
||
logger.error(f"Re-correlation sweep failed: {e}")
|
||
|
||
|
||
async def _run_sweep_pass() -> None:
|
||
cutoff = datetime.now(timezone.utc) - timedelta(minutes=settings.recorrelation_scan_minutes)
|
||
|
||
# Server-side range query: only calls that ended within the scan window.
|
||
# Filter incident_id=null client-side (Firestore can't query for missing fields).
|
||
# This keeps the fetched set small regardless of total collection size.
|
||
recent_ended = await fstore.collection_where("calls", [
|
||
("status", "==", "ended"),
|
||
("ended_at", ">=", cutoff),
|
||
])
|
||
# corr_path="unlinked" is written after MAX_SWEEP_ATTEMPTS failures.
|
||
# Allows a few retries so a welfare-check call can link to an escalation
|
||
# incident that is created a few minutes later, without sweeping 30× forever.
|
||
MAX_SWEEP_ATTEMPTS = 3
|
||
orphans = [
|
||
c for c in recent_ended
|
||
if not c.get("incident_ids") and not c.get("incident_id")
|
||
and not c.get("corr_path") # skip calls already exhausted
|
||
and c.get("corr_sweep_count", 0) < MAX_SWEEP_ATTEMPTS
|
||
]
|
||
|
||
if not orphans:
|
||
return
|
||
|
||
logger.info(f"Re-correlation sweep: {len(orphans)} orphaned call(s) to check")
|
||
linked = 0
|
||
for call in orphans:
|
||
if await _recorrelate_orphan(call):
|
||
linked += 1
|
||
|
||
if linked:
|
||
logger.info(f"Re-correlation sweep: linked {linked}/{len(orphans)} orphaned call(s)")
|
||
|
||
|
||
async def _recorrelate_orphan(call: dict) -> bool:
|
||
"""
|
||
Attempt to link a single orphaned call to an existing incident.
|
||
Returns True if a match was found and the call was linked.
|
||
"""
|
||
from app.internal import incident_correlator
|
||
|
||
call_id = call.get("call_id")
|
||
started_at = _parse_dt(call.get("started_at"))
|
||
if not call_id or not started_at:
|
||
return False
|
||
|
||
# All data needed for correlation was stored by the first-pass extraction.
|
||
incident_id = await incident_correlator.correlate_call(
|
||
call_id = call_id,
|
||
node_id = call.get("node_id", ""),
|
||
system_id = call.get("system_id"),
|
||
talkgroup_id = call.get("talkgroup_id"),
|
||
talkgroup_name = call.get("talkgroup_name"),
|
||
tags = call.get("tags") or [],
|
||
incident_type = call.get("incident_type"),
|
||
location = call.get("location"),
|
||
location_coords= call.get("location_coords"),
|
||
cleared_units = call.get("cleared_units") or [],
|
||
reference_time = started_at, # anchor window to when the call happened
|
||
create_if_new = False, # never create — link-only
|
||
)
|
||
|
||
if incident_id:
|
||
await fstore.doc_set("calls", call_id, {"incident_ids": [incident_id]})
|
||
logger.info(
|
||
f"Re-correlation: linked orphaned call {call_id} → incident {incident_id}"
|
||
)
|
||
return True
|
||
|
||
# Increment the attempt counter. Once MAX_SWEEP_ATTEMPTS is reached the
|
||
# orphan filter above will stop picking this call up, and we write
|
||
# corr_path="unlinked" as a permanent tombstone.
|
||
attempts = call.get("corr_sweep_count", 0) + 1
|
||
update: dict = {"corr_sweep_count": attempts}
|
||
if attempts >= 3:
|
||
update["corr_path"] = "unlinked"
|
||
await fstore.doc_set("calls", call_id, update)
|
||
return False
|
||
|
||
|
||
def _parse_dt(value) -> Optional[datetime]:
|
||
if not value:
|
||
return None
|
||
try:
|
||
dt = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt
|
||
except Exception:
|
||
return None
|