diff --git a/drb-c2-core/app/config.py b/drb-c2-core/app/config.py index a5b38cd..74942c2 100644 --- a/drb-c2-core/app/config.py +++ b/drb-c2-core/app/config.py @@ -26,6 +26,11 @@ class Settings(BaseSettings): # Gemini (intelligence extraction, embeddings, incident summaries) gemini_api_key: Optional[str] = None + # Correlation consensus models + # corr_cheap_model — first-pass LLM correlator (runs on every call) + # corr_smart_model — tiebreaker (only fires when rules and cheap LLM disagree) + corr_cheap_model: str = "gemini-2.0-flash" + corr_smart_model: str = "gemini-1.5-pro" summary_interval_minutes: int = 2 # how often the summary loop runs correlation_window_hours: int = 2 # slow/location path: max hours since last call embedding_similarity_threshold: float = 0.93 # slow-path: requires location corroboration diff --git a/drb-c2-core/app/internal/incident_correlator.py b/drb-c2-core/app/internal/incident_correlator.py index 84e9f99..d9b72c1 100644 --- a/drb-c2-core/app/internal/incident_correlator.py +++ b/drb-c2-core/app/internal/incident_correlator.py @@ -172,7 +172,7 @@ def _incident_idle_minutes(inc: dict, now: datetime) -> float: # ───────────────────────────────────────────────────────────────────────────── -# Public entry point +# Public API # ───────────────────────────────────────────────────────────────────────────── async def correlate_call( @@ -194,47 +194,155 @@ async def correlate_call( ) -> Optional[str]: """ Link call_id to an existing incident or create a new one. - - reference_time — time anchor for the time-limited paths (location + slow). - Defaults to now. Pass call.started_at when re-correlating - orphaned calls so the window is anchored to when the call - actually happened, not when the sweep runs. - create_if_new — when False, skip new-incident creation (re-correlation only - links to existing incidents; it never creates new ones). - - Returns the incident_id, or None if skipped. + Thin wrapper: builds context → runs rules decision → commits. """ + ctx = await _build_context( + call_id=call_id, units=units, vehicles=vehicles, cleared_units=cleared_units, + location_coords=location_coords, reference_time=reference_time, + system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, + tags=tags, incident_type=incident_type, location=location, + reassignment=reassignment, create_if_new=create_if_new, + ) + decision = _run_decision(ctx) + return await _apply_and_log(decision, ctx) + + +async def preview_correlation( + call_id: str, + node_id: str, + system_id: Optional[str], + talkgroup_id: Optional[int], + talkgroup_name: Optional[str], + tags: list[str], + incident_type: Optional[str], + location: Optional[str] = None, + location_coords: Optional[dict] = None, + reference_time: Optional[datetime] = None, + create_if_new: bool = True, + units: Optional[list[str]] = None, + vehicles: Optional[list[str]] = None, + cleared_units: Optional[list[str]] = None, + reassignment: bool = False, +) -> dict: + """ + Run the rules engine and return the decision WITHOUT committing to Firestore. + + Returns {"decision": {...}, "ctx": {...}}. + Pass the result to apply_correlation() to commit, or use the decision + fields directly for comparison in the consensus correlator. + + decision keys: + action "link" | "new" | "orphan" + matched_incident the candidate incident doc (action == "link") + incident_type resolved type after tag inference (action == "new") + corr_debug fields to persist on the call doc + """ + ctx = await _build_context( + call_id=call_id, units=units, vehicles=vehicles, cleared_units=cleared_units, + location_coords=location_coords, reference_time=reference_time, + system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, + tags=tags, incident_type=incident_type, location=location, + reassignment=reassignment, create_if_new=create_if_new, + ) + decision = _run_decision(ctx) + return {"decision": decision, "ctx": ctx} + + +async def apply_correlation(preview: dict) -> Optional[str]: + """ + Commit the decision returned by preview_correlation() to Firestore. + Returns the incident_id, or None if the call was orphaned. + """ + return await _apply_and_log(preview["decision"], preview["ctx"]) + + +# ───────────────────────────────────────────────────────────────────────────── +# Context builder — fetches all Firestore data needed for correlation +# ───────────────────────────────────────────────────────────────────────────── + +async def _build_context( + call_id: str, + units: Optional[list[str]], + vehicles: Optional[list[str]], + cleared_units: Optional[list[str]], + location_coords: Optional[dict], + reference_time: Optional[datetime], + system_id: Optional[str], + talkgroup_id: Optional[int], + talkgroup_name: Optional[str], + tags: list[str], + incident_type: Optional[str], + location: Optional[str], + reassignment: bool, + create_if_new: bool, +) -> dict: now = reference_time or datetime.now(timezone.utc) window = timedelta(hours=settings.correlation_window_hours) - # Fetch all active incidents cross-type (mutual aid needs this) all_active = await fstore.collection_list("incidents", status="active") recent = [inc for inc in all_active if _within_window_of(inc, now, window)] - # Fetch call doc once — reused for disambiguation, embedding merge, unit accumulation - call_doc = await fstore.doc_get("calls", call_id) or {} - call_embedding: Optional[list] = call_doc.get("embedding") - # Prefer explicitly passed units/vehicles (per-scene, from intelligence extraction) - # over the call doc, which merges units from ALL scenes in a multi-scene recording. - # Falling back to the call doc is correct for recorrelation sweeps where we have no - # scene-level breakdown. - call_units: list[str] = units if units is not None else (call_doc.get("units") or []) - call_vehicles: list[str] = vehicles if vehicles is not None else (call_doc.get("vehicles") or []) - call_cleared: list[str] = cleared_units if cleared_units is not None else (call_doc.get("cleared_units") or []) - call_severity: str = call_doc.get("severity") or "unknown" - # Use passed coords first (freshly geocoded), fall back to what's on the call doc - coords: Optional[dict] = location_coords or call_doc.get("location_coords") + call_doc = await fstore.doc_get("calls", call_id) or {} + call_embedding = call_doc.get("embedding") + call_units = units if units is not None else (call_doc.get("units") or []) + call_vehicles = vehicles if vehicles is not None else (call_doc.get("vehicles") or []) + call_cleared = cleared_units if cleared_units is not None else (call_doc.get("cleared_units") or []) + call_severity = call_doc.get("severity") or "unknown" + coords = location_coords or call_doc.get("location_coords") + is_thin_call = not call_units and not call_vehicles and not coords + + return { + "call_id": call_id, "all_active": all_active, "recent": recent, + "call_doc": call_doc, "call_embedding": call_embedding, + "call_units": call_units, "call_vehicles": call_vehicles, + "call_cleared": call_cleared, "call_severity": call_severity, + "coords": coords, "is_thin_call": is_thin_call, "now": now, + "system_id": system_id, "talkgroup_id": talkgroup_id, + "talkgroup_name": talkgroup_name, "tags": tags, + "incident_type": incident_type, "location": location, + "location_coords": location_coords, "reassignment": reassignment, + "create_if_new": create_if_new, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Rules decision engine — pure logic, no Firestore writes +# ───────────────────────────────────────────────────────────────────────────── + +def _run_decision(ctx: dict) -> dict: + """ + Run the hybrid rules correlation engine against a pre-built context. + No Firestore reads or writes — all data comes from ctx. + + Returns: + action "link" | "new" | "orphan" + matched_incident incident doc to update (action == "link") + incident_type resolved type (action == "new") + corr_debug fields to write to the call doc + """ + all_active = ctx["all_active"] + recent = ctx["recent"] + call_doc = ctx["call_doc"] + call_embedding = ctx["call_embedding"] + call_units = ctx["call_units"] + call_vehicles = ctx["call_vehicles"] + coords = ctx["coords"] + is_thin_call = ctx["is_thin_call"] + now = ctx["now"] + system_id = ctx["system_id"] + talkgroup_id = ctx["talkgroup_id"] + talkgroup_name = ctx["talkgroup_name"] + tags = ctx["tags"] + incident_type = ctx["incident_type"] + location = ctx["location"] + location_coords= ctx["location_coords"] + reassignment = ctx["reassignment"] + create_if_new = ctx["create_if_new"] + call_id = ctx["call_id"] matched_incident: Optional[dict] = None corr_debug: dict = {} - # A "thin" call carries no scene-identifying information — it is a pure - # status transmission (10-4, en route, acknowledgement). Detected by the - # absence of extracted units, vehicles, AND geocoded coordinates. Thin - # calls should link to wherever the last active conversation on this TGID - # was happening rather than running the full scene-verification logic. - is_thin_call = not call_units and not call_vehicles and not coords - # ── 1. Fast path: talkgroup match with recency gate ────────────────────── # # Only considers incidents updated within tg_fast_path_idle_minutes (default 30 min). @@ -589,103 +697,153 @@ async def correlate_call( f"call {call_id} → {best_inc['incident_id']} (sim={best_score:.3f})" ) - # ── Update existing or create new ──────────────────────────────────────── + # ── Decision output ─────────────────────────────────────────────────────── if matched_incident: - incident_id = matched_incident["incident_id"] + return { + "action": "link", + "matched_incident": matched_incident, + "incident_type": incident_type, + "corr_debug": corr_debug, + } + + if not create_if_new: + return {"action": "orphan", "matched_incident": None, "incident_type": None, "corr_debug": corr_debug} + + # Attempt type inference from tags before giving up on creation + resolved_type = incident_type + if not resolved_type and tags: + resolved_type = _infer_type_from_tags(tags) + if resolved_type: + logger.info( + f"Correlator: inferred incident_type={resolved_type!r} from tags {tags} for call {call_id}" + ) + + if not resolved_type: + return {"action": "orphan", "matched_incident": None, "incident_type": None, "corr_debug": corr_debug} + + return { + "action": "new", + "matched_incident": None, + "incident_type": resolved_type, + "corr_debug": corr_debug, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Commit layer — writes decisions to Firestore +# ───────────────────────────────────────────────────────────────────────────── + +async def _apply_and_log(decision: dict, ctx: dict) -> Optional[str]: + """Commit a rules decision and persist the corr_debug fields to the call doc.""" + incident_id = await _apply_decision(decision, ctx) + corr_debug = decision.get("corr_debug") or {} + if corr_debug: + try: + await fstore.doc_set("calls", ctx["call_id"], corr_debug) + except Exception as e: + logger.warning(f"Could not write corr_debug for call {ctx['call_id']}: {e}") + return incident_id + + +async def _apply_decision(decision: dict, ctx: dict) -> Optional[str]: + """ + Execute a correlation decision produced by _run_decision(). + Handles all Firestore writes; returns the incident_id or None. + """ + action = decision["action"] + if action == "orphan": + return None + + call_id = ctx["call_id"] + talkgroup_id = ctx["talkgroup_id"] + talkgroup_name = ctx["talkgroup_name"] + system_id = ctx["system_id"] + tags = ctx["tags"] + location = ctx["location"] + location_coords = ctx["location_coords"] + call_units = ctx["call_units"] + call_vehicles = ctx["call_vehicles"] + call_embedding = ctx["call_embedding"] + call_severity = ctx["call_severity"] + call_cleared = ctx["call_cleared"] + coords = ctx["coords"] + now = ctx["now"] + incident_type = decision["incident_type"] + + if action == "link": + matched_incident = decision["matched_incident"] await _update_incident( matched_incident, call_id, talkgroup_id, system_id, tags, location, location_coords, call_units, call_vehicles, call_embedding, now, talkgroup_name=talkgroup_name, incident_type=incident_type, cleared_units=call_cleared, ) - elif create_if_new: - # If GPT returned no type (missing talkgroup context is common), attempt - # to recover a type from the extracted tags before giving up on creation. - if not incident_type and tags: - incident_type = _infer_type_from_tags(tags) - if incident_type: - logger.info( - f"Correlator: inferred incident_type={incident_type!r} from tags " - f"{tags} for call {call_id} (no GPT type)" - ) - if not incident_type: - # No type and none inferred — nothing to create - return None + return matched_incident["incident_id"] - # ── Cross-system parent detection ───────────────────────────────────── - # Before creating a standalone incident, check whether this call belongs - # to an incident already opened by a different agency (multi-agency chase, - # mutual aid, etc.). If a parent candidate is found: - # • The existing candidate is demoted to a child (incident_type → "child") - # • A new master shell is created linking both children - # • The new call's incident is created as a second child of the master - cross_parent: Optional[dict] = None - if system_id: - cross_parent = await _find_cross_system_parent( - system_id=system_id, - incident_type=incident_type, - location=location, - location_coords=coords, - call_embedding=call_embedding, - recent=recent, + # action == "new" + # ── Cross-system parent detection ───────────────────────────────────────── + # Before creating a standalone incident, check whether this call belongs + # to an incident already opened by a different agency (multi-agency chase, + # mutual aid, etc.). If a parent candidate is found: + # • The existing candidate is demoted to a child (incident_type → "child") + # • A new master shell is created linking both children + # • The new call's incident is created as a second child of the master + cross_parent: Optional[dict] = None + if system_id: + cross_parent = await _find_cross_system_parent( + system_id=system_id, + incident_type=incident_type, + location=location, + location_coords=coords, + call_embedding=call_embedding, + recent=ctx["recent"], + ) + + if cross_parent: + existing_child_id = cross_parent["incident_id"] + existing_master_id = cross_parent.get("parent_incident_id") + + # Create the new agency's child incident first + incident_id = await _create_incident( + call_id, incident_type, talkgroup_id, talkgroup_name, system_id, + tags, location, location_coords, + call_units, call_vehicles, call_embedding, call_severity, now, + ) + + if existing_master_id: + # Candidate is already a child — link new child to the existing master + await _demote_to_child(incident_id, existing_master_id) + await _add_child_to_master(existing_master_id, incident_id, now) + decision["corr_debug"]["corr_path"] = "new/cross-system-child" + logger.info( + f"Correlator cross-system: call {call_id} → new child {incident_id} " + f"under existing master {existing_master_id}" ) - - if cross_parent: - existing_child_id = cross_parent["incident_id"] - existing_master_id = cross_parent.get("parent_incident_id") - - # Create the new agency's child incident first - incident_id = await _create_incident( - call_id, incident_type, talkgroup_id, talkgroup_name, system_id, - tags, location, location_coords, - call_units, call_vehicles, call_embedding, call_severity, now, - ) - - if existing_master_id: - # Candidate is already a child — link new child to the existing master - await _demote_to_child(incident_id, existing_master_id) - await _add_child_to_master(existing_master_id, incident_id, now) - corr_debug["corr_path"] = "new/cross-system-child" - logger.info( - f"Correlator cross-system: call {call_id} → new child {incident_id} " - f"under existing master {existing_master_id}" - ) - else: - # Candidate is a standalone master — create master shell, demote both - master_id = await _create_master_incident( - first_child_id=existing_child_id, - second_child_id=incident_id, - operational_type=incident_type, - location=cross_parent.get("location") or location, - location_coords=cross_parent.get("location_coords") or coords, - now=now, - ) - await _demote_to_child(existing_child_id, master_id) - await _demote_to_child(incident_id, master_id) - corr_debug["corr_path"] = "new/cross-system-master" - logger.info( - f"Correlator cross-system: created master {master_id}, " - f"demoted {existing_child_id} + new {incident_id} as children" - ) else: - # Normal single-agency incident creation - incident_id = await _create_incident( - call_id, incident_type, talkgroup_id, talkgroup_name, system_id, - tags, location, location_coords, - call_units, call_vehicles, call_embedding, call_severity, now, + # Candidate is a standalone — create master shell, demote both + master_id = await _create_master_incident( + first_child_id=existing_child_id, + second_child_id=incident_id, + operational_type=incident_type, + location=cross_parent.get("location") or location, + location_coords=cross_parent.get("location_coords") or coords, + now=now, + ) + await _demote_to_child(existing_child_id, master_id) + await _demote_to_child(incident_id, master_id) + decision["corr_debug"]["corr_path"] = "new/cross-system-master" + logger.info( + f"Correlator cross-system: created master {master_id}, " + f"demoted {existing_child_id} + new {incident_id} as children" ) - corr_debug["corr_path"] = "new" else: - # Creation suppressed (re-correlation sweep) — nothing to do - return None - - # Persist the correlation decision to the call document so it can be - # inspected in Firestore or the admin UI without log-scraping. - if corr_debug: - try: - await fstore.doc_set("calls", call_id, corr_debug) - except Exception as e: - logger.warning(f"Could not write corr_debug for call {call_id}: {e}") + # Normal single-agency incident creation + incident_id = await _create_incident( + call_id, incident_type, talkgroup_id, talkgroup_name, system_id, + tags, location, location_coords, + call_units, call_vehicles, call_embedding, call_severity, now, + ) + decision["corr_debug"]["corr_path"] = "new" return incident_id diff --git a/drb-c2-core/app/internal/llm_correlator.py b/drb-c2-core/app/internal/llm_correlator.py new file mode 100644 index 0000000..9dfb6e4 --- /dev/null +++ b/drb-c2-core/app/internal/llm_correlator.py @@ -0,0 +1,278 @@ +""" +LLM-based incident correlator using Gemini. + +Two functions are exposed: + decide(call_id, ctx) — cheap first-pass (corr_cheap_model) + tiebreak(rules_decision, llm_decision, ctx) — smart tiebreaker (corr_smart_model) + +Both return a decision dict compatible with _run_decision() in incident_correlator: + {"action": "link"|"new"|"orphan", + "matched_incident": dict|None, + "incident_type": str|None, + "corr_debug": dict, + "reasoning": str} ← extra field for logging/tiebreak comparison + +decide() is skipped for thin calls (no content to reason about) and when +GEMINI_API_KEY is not set — in those cases returns None so the caller knows +to fall back to the rules decision. + +Error handling: any Gemini failure returns None from decide() and the +rules_decision from tiebreak() so the pipeline never stalls. +""" +import asyncio +import json +from datetime import datetime, timezone +from typing import Optional +from app.internal.logger import logger +from app.config import settings + + +# ───────────────────────────────────────────────────────────────────────────── +# Prompt helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _fmt_idle(inc: dict, now: datetime) -> str: + try: + raw = inc.get("updated_at") or inc.get("started_at") or "" + dt = datetime.fromisoformat(str(raw).replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + minutes = int((now - dt).total_seconds() / 60) + return f"{minutes}min ago" if minutes < 60 else f"{minutes // 60}h{minutes % 60:02d}m ago" + except Exception: + return "?" + + +def _inc_summary(inc: dict, now: datetime) -> str: + parts = [f"id:{inc['incident_id']}", f"type:{inc.get('type') or '?'}"] + if inc.get("location"): + parts.append(f"loc:{inc['location']}") + units = inc.get("units") or [] + if units: + parts.append(f"units:[{', '.join(units[:6])}]") + tags = inc.get("tags") or [] + if tags: + parts.append(f"tags:[{', '.join(tags[:4])}]") + parts.append(f"idle:{_fmt_idle(inc, now)}") + return " | ".join(parts) + + +def _call_block(ctx: dict) -> str: + lines = [] + call_doc = ctx["call_doc"] + transcript = call_doc.get("transcript_corrected") or call_doc.get("transcript") + if transcript: + lines.append(f"Transcript: {transcript[:700]}") + if ctx["tags"]: + lines.append(f"Tags: {ctx['tags']}") + if ctx["incident_type"]: + lines.append(f"Incident type: {ctx['incident_type']}") + if ctx["location"]: + lines.append(f"Location: {ctx['location']}") + if ctx["call_units"]: + lines.append(f"Units: {ctx['call_units']}") + if ctx["call_vehicles"]: + lines.append(f"Vehicles: {ctx['call_vehicles']}") + if ctx["talkgroup_name"]: + lines.append(f"Talkgroup: {ctx['talkgroup_name']}") + return "\n".join(lines) if lines else "(no details)" + + +_SCHEMA = '{"action": "link" | "new" | "orphan", "incident_id": "", "reasoning": ""}' + +_RULES = """ +Rules: +- "link" only with clear positive evidence: same units, same geocoded location, or semantically identical scene on the same talkgroup within the last few minutes. +- A call on a DIFFERENT talkgroup than an incident requires unit overlap or geocoded location match — topic similarity alone is not enough. +- "new" only if the call has a clear incident_type AND describes a distinct, identifiable scene. +- "orphan" when in doubt — conservative is always correct. +- Do NOT link just because both calls involve police or both mention a road. +""" + + +def _build_decide_prompt(ctx: dict) -> str: + now = ctx["now"] + recent = ctx["recent"] + inc_block = ( + "\n".join(_inc_summary(inc, now) for inc in recent[:20]) + if recent else "(none)" + ) + return ( + "You are an incident correlator for a public safety radio monitoring system.\n\n" + "A new radio call has arrived. Decide whether it belongs to an existing active incident, " + "represents a new incident, or should be orphaned (not enough information).\n\n" + f"NEW CALL:\n{_call_block(ctx)}\n\n" + f"ACTIVE INCIDENTS ({len(recent)} recent):\n{inc_block}\n" + f"{_RULES}\n" + f"Respond with JSON only (no markdown):\n{_SCHEMA}" + ) + + +def _build_tiebreak_prompt(rules_decision: dict, llm_decision: dict, ctx: dict) -> str: + now = ctx["now"] + recent = ctx["recent"] + inc_block = ( + "\n".join(_inc_summary(inc, now) for inc in recent[:20]) + if recent else "(none)" + ) + + def _fmt(d: dict, name: str) -> str: + action = d.get("action", "?") + inc = d.get("matched_incident") + inc_id = inc["incident_id"] if inc else (d.get("incident_id") or "null") + reason = d.get("reasoning") or (d.get("corr_debug") or {}).get("corr_fit_signal") or "—" + return f" {name}: action={action}, incident_id={inc_id}, reasoning={reason!r}" + + return ( + "You are a senior incident correlator for a public safety radio monitoring system.\n\n" + "Two correlation engines disagree. You must make the final decision.\n\n" + f"NEW CALL:\n{_call_block(ctx)}\n\n" + f"ACTIVE INCIDENTS ({len(recent)} recent):\n{inc_block}\n\n" + "DISAGREEMENT:\n" + f"{_fmt(rules_decision, 'Rules engine')}\n" + f"{_fmt(llm_decision, 'LLM correlator')}\n" + f"{_RULES}\n" + f"Respond with JSON only (no markdown):\n{_SCHEMA}" + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Gemini API call (sync, runs in thread pool) +# ───────────────────────────────────────────────────────────────────────────── + +def _sync_gemini(model_name: str, prompt: str) -> dict: + import google.generativeai as genai # lazy import — only when needed + + genai.configure(api_key=settings.gemini_api_key) + model = genai.GenerativeModel( + model_name, + generation_config={"response_mime_type": "application/json"}, + ) + response = model.generate_content(prompt) + return json.loads(response.text) + + +# ───────────────────────────────────────────────────────────────────────────── +# Decision parsing +# ───────────────────────────────────────────────────────────────────────────── + +def _parse_response(raw: dict, ctx: dict) -> dict: + """ + Convert raw Gemini JSON output to a decision dict compatible with _run_decision(). + Resolves incident_id → full incident doc from ctx["all_active"]. + Handles type inference for "new" actions the same way as the rules engine. + """ + from app.internal.incident_correlator import _infer_type_from_tags # same-package import + + action = raw.get("action", "orphan") + reasoning = raw.get("reasoning", "") + + if action not in ("link", "new", "orphan"): + action = "orphan" + + matched_incident: Optional[dict] = None + + if action == "link": + inc_id = raw.get("incident_id") + if inc_id: + matched_incident = next( + (i for i in ctx["all_active"] if i.get("incident_id") == inc_id), + None, + ) + if not matched_incident: + logger.warning( + f"LLM correlator: incident_id={inc_id!r} not in active incidents — orphaning" + ) + action = "orphan" + + incident_type: Optional[str] = None + if action in ("link", "new"): + incident_type = ctx["incident_type"] + if not incident_type: + incident_type = _infer_type_from_tags(ctx["tags"]) + if action == "new" and not incident_type: + # Can't create an incident without a type — demote to orphan + action = "orphan" + matched_incident = None + + return { + "action": action, + "matched_incident": matched_incident, + "incident_type": incident_type, + "corr_debug": {"corr_llm_reasoning": reasoning}, + "reasoning": reasoning, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Public API +# ───────────────────────────────────────────────────────────────────────────── + +async def decide(call_id: str, ctx: dict) -> Optional[dict]: + """ + Run the cheap LLM correlator (corr_cheap_model) on the call. + + Returns a decision dict or None if: + - GEMINI_API_KEY is not configured + - the call is thin (content-free — no value from LLM) + - there are no recent active incidents to reason about + - Gemini fails + + Callers should treat None as "fall back to rules decision". + """ + if not settings.gemini_api_key: + return None + + if ctx["is_thin_call"]: + return None # thin calls have no transcript/units/coords to reason about + + if not ctx["recent"]: + return None # no incidents to correlate against — rules handles new-only + + try: + prompt = _build_decide_prompt(ctx) + raw = await asyncio.to_thread(_sync_gemini, settings.corr_cheap_model, prompt) + decision = _parse_response(raw, ctx) + _id = (decision["matched_incident"] or {}).get("incident_id", "null") + logger.info( + f"LLM correlator ({settings.corr_cheap_model}): call {call_id} → " + f"action={decision['action']} incident={_id} " + f"reasoning={decision['reasoning']!r}" + ) + return decision + except Exception as e: + logger.warning(f"LLM correlator failed for call {call_id}: {e}") + return None + + +async def tiebreak(rules_decision: dict, llm_decision: dict, ctx: dict) -> dict: + """ + Run the smart tiebreaker (corr_smart_model) when rules and LLM disagree. + Falls back to rules_decision on any error. + """ + call_id = ctx["call_id"] + try: + prompt = _build_tiebreak_prompt(rules_decision, llm_decision, ctx) + raw = await asyncio.to_thread(_sync_gemini, settings.corr_smart_model, prompt) + decision = _parse_response(raw, ctx) + _id = (decision["matched_incident"] or {}).get("incident_id", "null") + logger.info( + f"LLM tiebreak ({settings.corr_smart_model}): call {call_id} → " + f"action={decision['action']} incident={_id} " + f"reasoning={decision['reasoning']!r}" + ) + return decision + except Exception as e: + logger.warning(f"LLM tiebreak failed for call {call_id}: {e} — using rules decision") + return rules_decision + + +def decisions_agree(rules: dict, llm: dict) -> bool: + """True if both decisions agree on action and (when action=="link") on the target incident.""" + if rules["action"] != llm["action"]: + return False + if rules["action"] == "link": + r_id = (rules.get("matched_incident") or {}).get("incident_id") + l_id = (llm.get("matched_incident") or {}).get("incident_id") + return r_id == l_id + return True diff --git a/drb-c2-core/app/routers/upload.py b/drb-c2-core/app/routers/upload.py index 47e36b7..e874e75 100644 --- a/drb-c2-core/app/routers/upload.py +++ b/drb-c2-core/app/routers/upload.py @@ -83,6 +83,65 @@ def _public_url_to_gcs_uri(url: str) -> Optional[str]: return None +async def _correlate_with_consensus( + call_id: str, + node_id: str, + system_id: Optional[str], + talkgroup_id: Optional[int], + talkgroup_name: Optional[str], + tags: list[str], + incident_type: Optional[str], + location: Optional[str], + location_coords: Optional[dict], + units: Optional[list] = None, + vehicles: Optional[list] = None, + cleared_units: Optional[list] = None, + reassignment: bool = False, +) -> Optional[str]: + """ + Consensus correlator: runs the rules engine and the cheap LLM in sequence. + If they agree the rules decision is committed directly. + If they disagree a smarter tiebreaker LLM makes the final call. + + Falls back to rules-only when GEMINI_API_KEY is absent, the call is + content-free (thin), or any LLM call fails. + """ + from app.internal import incident_correlator, llm_correlator + + preview = await incident_correlator.preview_correlation( + call_id=call_id, node_id=node_id, system_id=system_id, + talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, + tags=tags, incident_type=incident_type, location=location, + location_coords=location_coords, units=units, vehicles=vehicles, + cleared_units=cleared_units, reassignment=reassignment, + ) + ctx = preview["ctx"] + rules_decision = preview["decision"] + + llm_decision = await llm_correlator.decide(call_id, ctx) + + if llm_decision is None: + # LLM unavailable, skipped (thin call), or errored — rules wins. + rules_decision["corr_debug"]["corr_consensus"] = "rules_only" + return await incident_correlator.apply_correlation(preview) + + if llm_correlator.decisions_agree(rules_decision, llm_decision): + rules_decision["corr_debug"]["corr_consensus"] = "agreed" + rules_decision["corr_debug"]["corr_llm_reasoning"] = llm_decision.get("reasoning", "") + return await incident_correlator.apply_correlation(preview) + + # Disagree — escalate to the smarter tiebreaker. + logger.info( + f"Consensus disagreement for call {call_id}: " + f"rules={rules_decision['action']} vs llm={llm_decision['action']} — tiebreak" + ) + final = await llm_correlator.tiebreak(rules_decision, llm_decision, ctx) + final["corr_debug"]["corr_consensus"] = "tiebreak" + final["corr_debug"]["corr_rules_action"] = rules_decision["action"] + final["corr_debug"]["corr_llm_action"] = llm_decision["action"] + return await incident_correlator.apply_correlation({"decision": final, "ctx": ctx}) + + async def _run_extraction_pipeline( call_id: str, node_id: str, @@ -114,7 +173,7 @@ async def _run_extraction_pipeline( # overlap so the new scene doesn't chain into the unit's previous incident. is_reassignment = bool(scene.get("reassignment")) corr_units = [] if is_reassignment else scene.get("units") - incident_id = await incident_correlator.correlate_call( + incident_id = await _correlate_with_consensus( call_id=call_id, node_id=node_id, system_id=system_id, @@ -217,7 +276,7 @@ async def _run_intelligence_pipeline( all_tags.extend(scene["tags"]) is_reassignment = bool(scene.get("reassignment")) corr_units = [] if is_reassignment else scene.get("units") - incident_id = await incident_correlator.correlate_call( + incident_id = await _correlate_with_consensus( call_id=call_id, node_id=node_id, system_id=system_id, @@ -246,7 +305,7 @@ async def _run_intelligence_pipeline( if not scenes: _call_doc = await fstore.doc_get("calls", call_id) if not (_call_doc or {}).get("skip_reason"): - incident_id = await incident_correlator.correlate_call( + incident_id = await _correlate_with_consensus( call_id=call_id, node_id=node_id, system_id=system_id,