Files
Logan 8b660d8e10 feat: incident correlation overhaul, signal-based auto-resolve, token fixes
Correlator
- Raise fast-path idle gate 30 → 90 min (tg_fast_path_idle_minutes)
- Fix disambiguate always-commits bug: run _call_fits_incident on winner
  before committing; fall through to new-incident creation if it fails
- Add unit-continuity path (path 1.5): matches all_active by shared unit
  IDs with a reassignment guard, bridges calls past the idle gate
- Add tag-based incident_type inference (_TAG_TYPE_HINTS) as GPT fallback,
  rescuing tagged calls that would have been dropped (616 observed orphans)
- Add master/child incident model: _create_master_incident, _demote_to_child,
  _add_child_to_master; new incidents stamped incident_type="master"
- Add cross-system parent detection (_find_cross_system_parent): two-signal
  scoring (road overlap=0.4, embedding≥0.78=0.3, proximity=0.3, threshold=0.5)
  wired into create-if-new path; creates master shell on first cross-system match
- Add maybe_resolve_parent: auto-resolves master when all children close;
  called from upload pipeline (LLM closure) and summarizer stale sweep
- Add signal-based auto-resolve via units_active/units_cleared tracking:
  GPT now extracts cleared_units per scene; _update_incident moves units
  between active/cleared lists and resolves the incident when active empties;
  stored on call doc for re-correlation sweep reuse
- Add _create_incident initialization of units_active/units_cleared fields

Re-correlation sweep
- Add corr_sweep_count + MAX_SWEEP_ATTEMPTS=3: orphans get 3 attempts
  then are tombstoned as corr_path="unlinked", ending the re-sweep loop
  (previously hammering each orphan 29-31 times per shift)

Intelligence extraction
- Add cleared_units to GPT prompt schema and rules
- Extract and propagate cleared_units per scene; merge across scenes;
  store on call doc for re-correlation sweep

Token management
- Fix token release bug: remove release_token call on discord_connected=False
  in MQTT checkin (transient Discord drops were orphaning bots mid-shift)
- Add PUT /tokens/{id}/prefer/{system_id} endpoint: lock a bot token to a
  system; pass _none as system_id to clear; stored bidirectionally on both
  token and system documents
- discord_join handler resolves preferred_token_id from system doc and passes
  system_name in MQTT payload
2026-05-10 19:49:05 -04:00

158 lines
5.5 KiB
Python

"""
Background incident summary loop.
Runs every SUMMARY_INTERVAL_MINUTES. Two passes per tick:
1. Summary pass — find stale incidents (summary_stale=True) and regenerate summaries.
2. Stale sweep — auto-resolve incidents with no new calls for incident_auto_resolve_minutes.
This is effectively "time since last call" because updated_at is stamped on every
new linked call.
"""
import asyncio
from datetime import datetime, timezone, timedelta
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
from app.config import settings
async def summarizer_loop() -> None:
from app.internal.feature_flags import get_flags
interval = settings.summary_interval_minutes * 60
logger.info(f"Summarizer started — interval: {settings.summary_interval_minutes}m")
while True:
await asyncio.sleep(interval)
try:
flags = await get_flags()
if flags["summaries_enabled"]:
await _run_summary_pass()
await _resolve_stale_incidents()
else:
logger.info("Summaries disabled — skipping summary pass and stale incident sweep")
except Exception as e:
logger.error(f"Summarizer pass failed: {e}")
async def _run_summary_pass() -> None:
stale = await fstore.collection_list("incidents", status="active", summary_stale=True)
if not stale:
return
logger.info(f"Summarizer: processing {len(stale)} stale incident(s)")
for inc in stale:
await _summarize_incident(inc)
async def _summarize_incident(inc: dict) -> None:
incident_id = inc.get("incident_id")
if not incident_id:
return
call_ids: list[str] = inc.get("call_ids", [])
if not call_ids:
return
# Fetch transcripts for all calls in this incident
transcripts: list[str] = []
for cid in call_ids:
doc = await fstore.doc_get("calls", cid)
if doc and doc.get("transcript"):
transcripts.append(doc["transcript"])
if not transcripts:
# No transcripts yet — clear stale flag and wait for next pass
await fstore.doc_set("incidents", incident_id, {"summary_stale": False})
return
summary = await asyncio.to_thread(_sync_summarize, inc, transcripts)
now = datetime.now(timezone.utc).isoformat()
updates: dict = {
"summary_stale": False,
"summary_last_run": now,
}
if summary:
updates["summary"] = summary
logger.info(f"Summarizer: updated summary for incident {incident_id}")
else:
logger.warning(f"Summarizer: Gemini returned nothing for incident {incident_id}")
await fstore.doc_set("incidents", incident_id, updates)
async def _resolve_stale_incidents() -> None:
"""Auto-resolve active incidents that have had no new calls for incident_auto_resolve_minutes."""
all_active = await fstore.collection_list("incidents", status="active")
if not all_active:
return
now = datetime.now(timezone.utc)
cutoff = timedelta(minutes=settings.incident_auto_resolve_minutes)
count = 0
for inc in all_active:
incident_id = inc.get("incident_id")
if not incident_id:
continue
try:
updated_dt = datetime.fromisoformat(
str(inc.get("updated_at", "")).replace("Z", "+00:00")
)
if updated_dt.tzinfo is None:
updated_dt = updated_dt.replace(tzinfo=timezone.utc)
idle_minutes = (now - updated_dt).total_seconds() / 60
if idle_minutes > settings.incident_auto_resolve_minutes:
await fstore.doc_set("incidents", incident_id, {"status": "resolved"})
from app.internal.incident_correlator import maybe_resolve_parent
await maybe_resolve_parent(incident_id)
logger.info(
f"Auto-resolved stale incident {incident_id} "
f"(idle {idle_minutes:.0f}m)"
)
count += 1
except Exception as e:
logger.warning(f"Stale sweep error for {incident_id}: {e}")
if count:
logger.info(f"Stale sweep: resolved {count} incident(s)")
def _sync_summarize(inc: dict, transcripts: list[str]) -> Optional[str]:
from app.config import settings
from openai import OpenAI
if not settings.openai_api_key:
return None
inc_type = inc.get("type", "unknown")
location = inc.get("location") or "unknown location"
tg_ids = ", ".join(inc.get("talkgroup_ids", [])) or "unknown"
numbered = "\n".join(f"{i+1}. {t}" for i, t in enumerate(transcripts))
prompt = f"""You are analyzing P25 public safety radio communications for a single active incident.
Incident type: {inc_type}
Location: {location}
Talkgroup(s): {tg_ids}
Transcripts ({len(transcripts)} calls, chronological):
{numbered}
Write a concise factual summary of this incident in 2-4 sentences. Include:
- What happened
- Location (most specific mentioned)
- Units or resources involved if mentioned
- Current status if determinable
Be factual. Do not speculate beyond what the transcripts say. Do not use bullet points."""
try:
client = OpenAI(api_key=settings.openai_api_key)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content.strip() or None
except Exception as e:
logger.warning(f"GPT-4o mini summary failed: {e}")
return None