""" Background incident summary loop. Runs every SUMMARY_INTERVAL_MINUTES. Two passes per tick: 1. Summary pass — find stale incidents (summary_stale=True) and regenerate summaries. 2. Stale sweep — auto-resolve incidents with no new calls for incident_auto_resolve_minutes. This is effectively "time since last call" because updated_at is stamped on every new linked call. """ import asyncio from datetime import datetime, timezone, timedelta from typing import Optional from app.internal.logger import logger from app.internal import firestore as fstore from app.config import settings async def summarizer_loop() -> None: from app.internal.feature_flags import get_flags interval = settings.summary_interval_minutes * 60 logger.info(f"Summarizer started — interval: {settings.summary_interval_minutes}m") while True: await asyncio.sleep(interval) try: flags = await get_flags() if flags["summaries_enabled"]: await _run_summary_pass() await _resolve_stale_incidents() else: logger.info("Summaries disabled — skipping summary pass and stale incident sweep") except Exception as e: logger.error(f"Summarizer pass failed: {e}") async def _run_summary_pass() -> None: stale = await fstore.collection_list("incidents", status="active", summary_stale=True) if not stale: return logger.info(f"Summarizer: processing {len(stale)} stale incident(s)") for inc in stale: await _summarize_incident(inc) async def _summarize_incident(inc: dict) -> None: incident_id = inc.get("incident_id") if not incident_id: return call_ids: list[str] = inc.get("call_ids", []) if not call_ids: return # Fetch transcripts for all calls in this incident transcripts: list[str] = [] for cid in call_ids: doc = await fstore.doc_get("calls", cid) if doc and doc.get("transcript"): transcripts.append(doc["transcript"]) if not transcripts: # No transcripts yet — clear stale flag and wait for next pass await fstore.doc_set("incidents", incident_id, {"summary_stale": False}) return summary = await asyncio.to_thread(_sync_summarize, inc, transcripts) now = datetime.now(timezone.utc).isoformat() updates: dict = { "summary_stale": False, "summary_last_run": now, } if summary: updates["summary"] = summary logger.info(f"Summarizer: updated summary for incident {incident_id}") else: logger.warning(f"Summarizer: Gemini returned nothing for incident {incident_id}") await fstore.doc_set("incidents", incident_id, updates) async def _resolve_stale_incidents() -> None: """Auto-resolve active incidents that have had no new calls for incident_auto_resolve_minutes.""" all_active = await fstore.collection_list("incidents", status="active") if not all_active: return now = datetime.now(timezone.utc) cutoff = timedelta(minutes=settings.incident_auto_resolve_minutes) count = 0 for inc in all_active: incident_id = inc.get("incident_id") if not incident_id: continue try: updated_dt = datetime.fromisoformat( str(inc.get("updated_at", "")).replace("Z", "+00:00") ) if updated_dt.tzinfo is None: updated_dt = updated_dt.replace(tzinfo=timezone.utc) idle_minutes = (now - updated_dt).total_seconds() / 60 if idle_minutes > settings.incident_auto_resolve_minutes: await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) logger.info( f"Auto-resolved stale incident {incident_id} " f"(idle {idle_minutes:.0f}m)" ) count += 1 except Exception as e: logger.warning(f"Stale sweep error for {incident_id}: {e}") if count: logger.info(f"Stale sweep: resolved {count} incident(s)") def _sync_summarize(inc: dict, transcripts: list[str]) -> Optional[str]: from app.config import settings from openai import OpenAI if not settings.openai_api_key: return None inc_type = inc.get("type", "unknown") location = inc.get("location") or "unknown location" tg_ids = ", ".join(inc.get("talkgroup_ids", [])) or "unknown" numbered = "\n".join(f"{i+1}. {t}" for i, t in enumerate(transcripts)) prompt = f"""You are analyzing P25 public safety radio communications for a single active incident. Incident type: {inc_type} Location: {location} Talkgroup(s): {tg_ids} Transcripts ({len(transcripts)} calls, chronological): {numbered} Write a concise factual summary of this incident in 2-4 sentences. Include: - What happened - Location (most specific mentioned) - Units or resources involved if mentioned - Current status if determinable Be factual. Do not speculate beyond what the transcripts say. Do not use bullet points.""" try: client = OpenAI(api_key=settings.openai_api_key) response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], ) return response.choices[0].message.content.strip() or None except Exception as e: logger.warning(f"GPT-4o mini summary failed: {e}") return None