Add consensus correlator: rules + Gemini LLM with smart tiebreaker

Refactor incident_correlator.py to a decision/commit split (preview_correlation
/ apply_correlation) so the rules engine and LLM can both produce decisions before
anything is written to Firestore.

Add llm_correlator.py: cheap Gemini Flash first-pass + Gemini Pro tiebreaker.
Wire _correlate_with_consensus in upload.py — rules-only fallback when key is
absent or call is thin; agreed/tiebreak consensus written to corr_debug.
This commit is contained in:
Logan
2026-06-01 00:56:11 -04:00
parent 6bf4333b72
commit cbcc85f7b1
4 changed files with 619 additions and 119 deletions
+5
View File
@@ -26,6 +26,11 @@ class Settings(BaseSettings):
# Gemini (intelligence extraction, embeddings, incident summaries)
gemini_api_key: Optional[str] = None
# Correlation consensus models
# corr_cheap_model — first-pass LLM correlator (runs on every call)
# corr_smart_model — tiebreaker (only fires when rules and cheap LLM disagree)
corr_cheap_model: str = "gemini-2.0-flash"
corr_smart_model: str = "gemini-1.5-pro"
summary_interval_minutes: int = 2 # how often the summary loop runs
correlation_window_hours: int = 2 # slow/location path: max hours since last call
embedding_similarity_threshold: float = 0.93 # slow-path: requires location corroboration
+274 -116
View File
@@ -172,7 +172,7 @@ def _incident_idle_minutes(inc: dict, now: datetime) -> float:
# ─────────────────────────────────────────────────────────────────────────────
# Public entry point
# Public API
# ─────────────────────────────────────────────────────────────────────────────
async def correlate_call(
@@ -194,47 +194,155 @@ async def correlate_call(
) -> Optional[str]:
"""
Link call_id to an existing incident or create a new one.
reference_time — time anchor for the time-limited paths (location + slow).
Defaults to now. Pass call.started_at when re-correlating
orphaned calls so the window is anchored to when the call
actually happened, not when the sweep runs.
create_if_new — when False, skip new-incident creation (re-correlation only
links to existing incidents; it never creates new ones).
Returns the incident_id, or None if skipped.
Thin wrapper: builds context → runs rules decision → commits.
"""
ctx = await _build_context(
call_id=call_id, units=units, vehicles=vehicles, cleared_units=cleared_units,
location_coords=location_coords, reference_time=reference_time,
system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name,
tags=tags, incident_type=incident_type, location=location,
reassignment=reassignment, create_if_new=create_if_new,
)
decision = _run_decision(ctx)
return await _apply_and_log(decision, ctx)
async def preview_correlation(
call_id: str,
node_id: str,
system_id: Optional[str],
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
tags: list[str],
incident_type: Optional[str],
location: Optional[str] = None,
location_coords: Optional[dict] = None,
reference_time: Optional[datetime] = None,
create_if_new: bool = True,
units: Optional[list[str]] = None,
vehicles: Optional[list[str]] = None,
cleared_units: Optional[list[str]] = None,
reassignment: bool = False,
) -> dict:
"""
Run the rules engine and return the decision WITHOUT committing to Firestore.
Returns {"decision": {...}, "ctx": {...}}.
Pass the result to apply_correlation() to commit, or use the decision
fields directly for comparison in the consensus correlator.
decision keys:
action "link" | "new" | "orphan"
matched_incident the candidate incident doc (action == "link")
incident_type resolved type after tag inference (action == "new")
corr_debug fields to persist on the call doc
"""
ctx = await _build_context(
call_id=call_id, units=units, vehicles=vehicles, cleared_units=cleared_units,
location_coords=location_coords, reference_time=reference_time,
system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name,
tags=tags, incident_type=incident_type, location=location,
reassignment=reassignment, create_if_new=create_if_new,
)
decision = _run_decision(ctx)
return {"decision": decision, "ctx": ctx}
async def apply_correlation(preview: dict) -> Optional[str]:
"""
Commit the decision returned by preview_correlation() to Firestore.
Returns the incident_id, or None if the call was orphaned.
"""
return await _apply_and_log(preview["decision"], preview["ctx"])
# ─────────────────────────────────────────────────────────────────────────────
# Context builder — fetches all Firestore data needed for correlation
# ─────────────────────────────────────────────────────────────────────────────
async def _build_context(
call_id: str,
units: Optional[list[str]],
vehicles: Optional[list[str]],
cleared_units: Optional[list[str]],
location_coords: Optional[dict],
reference_time: Optional[datetime],
system_id: Optional[str],
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
tags: list[str],
incident_type: Optional[str],
location: Optional[str],
reassignment: bool,
create_if_new: bool,
) -> dict:
now = reference_time or datetime.now(timezone.utc)
window = timedelta(hours=settings.correlation_window_hours)
# Fetch all active incidents cross-type (mutual aid needs this)
all_active = await fstore.collection_list("incidents", status="active")
recent = [inc for inc in all_active if _within_window_of(inc, now, window)]
# Fetch call doc once — reused for disambiguation, embedding merge, unit accumulation
call_doc = await fstore.doc_get("calls", call_id) or {}
call_embedding: Optional[list] = call_doc.get("embedding")
# Prefer explicitly passed units/vehicles (per-scene, from intelligence extraction)
# over the call doc, which merges units from ALL scenes in a multi-scene recording.
# Falling back to the call doc is correct for recorrelation sweeps where we have no
# scene-level breakdown.
call_units: list[str] = units if units is not None else (call_doc.get("units") or [])
call_vehicles: list[str] = vehicles if vehicles is not None else (call_doc.get("vehicles") or [])
call_cleared: list[str] = cleared_units if cleared_units is not None else (call_doc.get("cleared_units") or [])
call_severity: str = call_doc.get("severity") or "unknown"
# Use passed coords first (freshly geocoded), fall back to what's on the call doc
coords: Optional[dict] = location_coords or call_doc.get("location_coords")
call_doc = await fstore.doc_get("calls", call_id) or {}
call_embedding = call_doc.get("embedding")
call_units = units if units is not None else (call_doc.get("units") or [])
call_vehicles = vehicles if vehicles is not None else (call_doc.get("vehicles") or [])
call_cleared = cleared_units if cleared_units is not None else (call_doc.get("cleared_units") or [])
call_severity = call_doc.get("severity") or "unknown"
coords = location_coords or call_doc.get("location_coords")
is_thin_call = not call_units and not call_vehicles and not coords
return {
"call_id": call_id, "all_active": all_active, "recent": recent,
"call_doc": call_doc, "call_embedding": call_embedding,
"call_units": call_units, "call_vehicles": call_vehicles,
"call_cleared": call_cleared, "call_severity": call_severity,
"coords": coords, "is_thin_call": is_thin_call, "now": now,
"system_id": system_id, "talkgroup_id": talkgroup_id,
"talkgroup_name": talkgroup_name, "tags": tags,
"incident_type": incident_type, "location": location,
"location_coords": location_coords, "reassignment": reassignment,
"create_if_new": create_if_new,
}
# ─────────────────────────────────────────────────────────────────────────────
# Rules decision engine — pure logic, no Firestore writes
# ─────────────────────────────────────────────────────────────────────────────
def _run_decision(ctx: dict) -> dict:
"""
Run the hybrid rules correlation engine against a pre-built context.
No Firestore reads or writes — all data comes from ctx.
Returns:
action "link" | "new" | "orphan"
matched_incident incident doc to update (action == "link")
incident_type resolved type (action == "new")
corr_debug fields to write to the call doc
"""
all_active = ctx["all_active"]
recent = ctx["recent"]
call_doc = ctx["call_doc"]
call_embedding = ctx["call_embedding"]
call_units = ctx["call_units"]
call_vehicles = ctx["call_vehicles"]
coords = ctx["coords"]
is_thin_call = ctx["is_thin_call"]
now = ctx["now"]
system_id = ctx["system_id"]
talkgroup_id = ctx["talkgroup_id"]
talkgroup_name = ctx["talkgroup_name"]
tags = ctx["tags"]
incident_type = ctx["incident_type"]
location = ctx["location"]
location_coords= ctx["location_coords"]
reassignment = ctx["reassignment"]
create_if_new = ctx["create_if_new"]
call_id = ctx["call_id"]
matched_incident: Optional[dict] = None
corr_debug: dict = {}
# A "thin" call carries no scene-identifying information — it is a pure
# status transmission (10-4, en route, acknowledgement). Detected by the
# absence of extracted units, vehicles, AND geocoded coordinates. Thin
# calls should link to wherever the last active conversation on this TGID
# was happening rather than running the full scene-verification logic.
is_thin_call = not call_units and not call_vehicles and not coords
# ── 1. Fast path: talkgroup match with recency gate ──────────────────────
#
# Only considers incidents updated within tg_fast_path_idle_minutes (default 30 min).
@@ -589,103 +697,153 @@ async def correlate_call(
f"call {call_id}{best_inc['incident_id']} (sim={best_score:.3f})"
)
# ── Update existing or create new ────────────────────────────────────────
# ── Decision output ───────────────────────────────────────────────────────
if matched_incident:
incident_id = matched_incident["incident_id"]
return {
"action": "link",
"matched_incident": matched_incident,
"incident_type": incident_type,
"corr_debug": corr_debug,
}
if not create_if_new:
return {"action": "orphan", "matched_incident": None, "incident_type": None, "corr_debug": corr_debug}
# Attempt type inference from tags before giving up on creation
resolved_type = incident_type
if not resolved_type and tags:
resolved_type = _infer_type_from_tags(tags)
if resolved_type:
logger.info(
f"Correlator: inferred incident_type={resolved_type!r} from tags {tags} for call {call_id}"
)
if not resolved_type:
return {"action": "orphan", "matched_incident": None, "incident_type": None, "corr_debug": corr_debug}
return {
"action": "new",
"matched_incident": None,
"incident_type": resolved_type,
"corr_debug": corr_debug,
}
# ─────────────────────────────────────────────────────────────────────────────
# Commit layer — writes decisions to Firestore
# ─────────────────────────────────────────────────────────────────────────────
async def _apply_and_log(decision: dict, ctx: dict) -> Optional[str]:
"""Commit a rules decision and persist the corr_debug fields to the call doc."""
incident_id = await _apply_decision(decision, ctx)
corr_debug = decision.get("corr_debug") or {}
if corr_debug:
try:
await fstore.doc_set("calls", ctx["call_id"], corr_debug)
except Exception as e:
logger.warning(f"Could not write corr_debug for call {ctx['call_id']}: {e}")
return incident_id
async def _apply_decision(decision: dict, ctx: dict) -> Optional[str]:
"""
Execute a correlation decision produced by _run_decision().
Handles all Firestore writes; returns the incident_id or None.
"""
action = decision["action"]
if action == "orphan":
return None
call_id = ctx["call_id"]
talkgroup_id = ctx["talkgroup_id"]
talkgroup_name = ctx["talkgroup_name"]
system_id = ctx["system_id"]
tags = ctx["tags"]
location = ctx["location"]
location_coords = ctx["location_coords"]
call_units = ctx["call_units"]
call_vehicles = ctx["call_vehicles"]
call_embedding = ctx["call_embedding"]
call_severity = ctx["call_severity"]
call_cleared = ctx["call_cleared"]
coords = ctx["coords"]
now = ctx["now"]
incident_type = decision["incident_type"]
if action == "link":
matched_incident = decision["matched_incident"]
await _update_incident(
matched_incident, call_id, talkgroup_id, system_id, tags,
location, location_coords, call_units, call_vehicles, call_embedding, now,
talkgroup_name=talkgroup_name, incident_type=incident_type,
cleared_units=call_cleared,
)
elif create_if_new:
# If GPT returned no type (missing talkgroup context is common), attempt
# to recover a type from the extracted tags before giving up on creation.
if not incident_type and tags:
incident_type = _infer_type_from_tags(tags)
if incident_type:
logger.info(
f"Correlator: inferred incident_type={incident_type!r} from tags "
f"{tags} for call {call_id} (no GPT type)"
)
if not incident_type:
# No type and none inferred — nothing to create
return None
return matched_incident["incident_id"]
# ── Cross-system parent detection ─────────────────────────────────────
# Before creating a standalone incident, check whether this call belongs
# to an incident already opened by a different agency (multi-agency chase,
# mutual aid, etc.). If a parent candidate is found:
# • The existing candidate is demoted to a child (incident_type → "child")
# • A new master shell is created linking both children
# • The new call's incident is created as a second child of the master
cross_parent: Optional[dict] = None
if system_id:
cross_parent = await _find_cross_system_parent(
system_id=system_id,
incident_type=incident_type,
location=location,
location_coords=coords,
call_embedding=call_embedding,
recent=recent,
# action == "new"
# ── Cross-system parent detection ─────────────────────────────────────────
# Before creating a standalone incident, check whether this call belongs
# to an incident already opened by a different agency (multi-agency chase,
# mutual aid, etc.). If a parent candidate is found:
# • The existing candidate is demoted to a child (incident_type → "child")
# • A new master shell is created linking both children
# • The new call's incident is created as a second child of the master
cross_parent: Optional[dict] = None
if system_id:
cross_parent = await _find_cross_system_parent(
system_id=system_id,
incident_type=incident_type,
location=location,
location_coords=coords,
call_embedding=call_embedding,
recent=ctx["recent"],
)
if cross_parent:
existing_child_id = cross_parent["incident_id"]
existing_master_id = cross_parent.get("parent_incident_id")
# Create the new agency's child incident first
incident_id = await _create_incident(
call_id, incident_type, talkgroup_id, talkgroup_name, system_id,
tags, location, location_coords,
call_units, call_vehicles, call_embedding, call_severity, now,
)
if existing_master_id:
# Candidate is already a child — link new child to the existing master
await _demote_to_child(incident_id, existing_master_id)
await _add_child_to_master(existing_master_id, incident_id, now)
decision["corr_debug"]["corr_path"] = "new/cross-system-child"
logger.info(
f"Correlator cross-system: call {call_id} → new child {incident_id} "
f"under existing master {existing_master_id}"
)
if cross_parent:
existing_child_id = cross_parent["incident_id"]
existing_master_id = cross_parent.get("parent_incident_id")
# Create the new agency's child incident first
incident_id = await _create_incident(
call_id, incident_type, talkgroup_id, talkgroup_name, system_id,
tags, location, location_coords,
call_units, call_vehicles, call_embedding, call_severity, now,
)
if existing_master_id:
# Candidate is already a child — link new child to the existing master
await _demote_to_child(incident_id, existing_master_id)
await _add_child_to_master(existing_master_id, incident_id, now)
corr_debug["corr_path"] = "new/cross-system-child"
logger.info(
f"Correlator cross-system: call {call_id} → new child {incident_id} "
f"under existing master {existing_master_id}"
)
else:
# Candidate is a standalone master — create master shell, demote both
master_id = await _create_master_incident(
first_child_id=existing_child_id,
second_child_id=incident_id,
operational_type=incident_type,
location=cross_parent.get("location") or location,
location_coords=cross_parent.get("location_coords") or coords,
now=now,
)
await _demote_to_child(existing_child_id, master_id)
await _demote_to_child(incident_id, master_id)
corr_debug["corr_path"] = "new/cross-system-master"
logger.info(
f"Correlator cross-system: created master {master_id}, "
f"demoted {existing_child_id} + new {incident_id} as children"
)
else:
# Normal single-agency incident creation
incident_id = await _create_incident(
call_id, incident_type, talkgroup_id, talkgroup_name, system_id,
tags, location, location_coords,
call_units, call_vehicles, call_embedding, call_severity, now,
# Candidate is a standalone — create master shell, demote both
master_id = await _create_master_incident(
first_child_id=existing_child_id,
second_child_id=incident_id,
operational_type=incident_type,
location=cross_parent.get("location") or location,
location_coords=cross_parent.get("location_coords") or coords,
now=now,
)
await _demote_to_child(existing_child_id, master_id)
await _demote_to_child(incident_id, master_id)
decision["corr_debug"]["corr_path"] = "new/cross-system-master"
logger.info(
f"Correlator cross-system: created master {master_id}, "
f"demoted {existing_child_id} + new {incident_id} as children"
)
corr_debug["corr_path"] = "new"
else:
# Creation suppressed (re-correlation sweep) — nothing to do
return None
# Persist the correlation decision to the call document so it can be
# inspected in Firestore or the admin UI without log-scraping.
if corr_debug:
try:
await fstore.doc_set("calls", call_id, corr_debug)
except Exception as e:
logger.warning(f"Could not write corr_debug for call {call_id}: {e}")
# Normal single-agency incident creation
incident_id = await _create_incident(
call_id, incident_type, talkgroup_id, talkgroup_name, system_id,
tags, location, location_coords,
call_units, call_vehicles, call_embedding, call_severity, now,
)
decision["corr_debug"]["corr_path"] = "new"
return incident_id
+278
View File
@@ -0,0 +1,278 @@
"""
LLM-based incident correlator using Gemini.
Two functions are exposed:
decide(call_id, ctx) — cheap first-pass (corr_cheap_model)
tiebreak(rules_decision, llm_decision, ctx) — smart tiebreaker (corr_smart_model)
Both return a decision dict compatible with _run_decision() in incident_correlator:
{"action": "link"|"new"|"orphan",
"matched_incident": dict|None,
"incident_type": str|None,
"corr_debug": dict,
"reasoning": str} ← extra field for logging/tiebreak comparison
decide() is skipped for thin calls (no content to reason about) and when
GEMINI_API_KEY is not set — in those cases returns None so the caller knows
to fall back to the rules decision.
Error handling: any Gemini failure returns None from decide() and the
rules_decision from tiebreak() so the pipeline never stalls.
"""
import asyncio
import json
from datetime import datetime, timezone
from typing import Optional
from app.internal.logger import logger
from app.config import settings
# ─────────────────────────────────────────────────────────────────────────────
# Prompt helpers
# ─────────────────────────────────────────────────────────────────────────────
def _fmt_idle(inc: dict, now: datetime) -> str:
try:
raw = inc.get("updated_at") or inc.get("started_at") or ""
dt = datetime.fromisoformat(str(raw).replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
minutes = int((now - dt).total_seconds() / 60)
return f"{minutes}min ago" if minutes < 60 else f"{minutes // 60}h{minutes % 60:02d}m ago"
except Exception:
return "?"
def _inc_summary(inc: dict, now: datetime) -> str:
parts = [f"id:{inc['incident_id']}", f"type:{inc.get('type') or '?'}"]
if inc.get("location"):
parts.append(f"loc:{inc['location']}")
units = inc.get("units") or []
if units:
parts.append(f"units:[{', '.join(units[:6])}]")
tags = inc.get("tags") or []
if tags:
parts.append(f"tags:[{', '.join(tags[:4])}]")
parts.append(f"idle:{_fmt_idle(inc, now)}")
return " | ".join(parts)
def _call_block(ctx: dict) -> str:
lines = []
call_doc = ctx["call_doc"]
transcript = call_doc.get("transcript_corrected") or call_doc.get("transcript")
if transcript:
lines.append(f"Transcript: {transcript[:700]}")
if ctx["tags"]:
lines.append(f"Tags: {ctx['tags']}")
if ctx["incident_type"]:
lines.append(f"Incident type: {ctx['incident_type']}")
if ctx["location"]:
lines.append(f"Location: {ctx['location']}")
if ctx["call_units"]:
lines.append(f"Units: {ctx['call_units']}")
if ctx["call_vehicles"]:
lines.append(f"Vehicles: {ctx['call_vehicles']}")
if ctx["talkgroup_name"]:
lines.append(f"Talkgroup: {ctx['talkgroup_name']}")
return "\n".join(lines) if lines else "(no details)"
_SCHEMA = '{"action": "link" | "new" | "orphan", "incident_id": "<id_string or null>", "reasoning": "<one sentence>"}'
_RULES = """
Rules:
- "link" only with clear positive evidence: same units, same geocoded location, or semantically identical scene on the same talkgroup within the last few minutes.
- A call on a DIFFERENT talkgroup than an incident requires unit overlap or geocoded location match — topic similarity alone is not enough.
- "new" only if the call has a clear incident_type AND describes a distinct, identifiable scene.
- "orphan" when in doubt — conservative is always correct.
- Do NOT link just because both calls involve police or both mention a road.
"""
def _build_decide_prompt(ctx: dict) -> str:
now = ctx["now"]
recent = ctx["recent"]
inc_block = (
"\n".join(_inc_summary(inc, now) for inc in recent[:20])
if recent else "(none)"
)
return (
"You are an incident correlator for a public safety radio monitoring system.\n\n"
"A new radio call has arrived. Decide whether it belongs to an existing active incident, "
"represents a new incident, or should be orphaned (not enough information).\n\n"
f"NEW CALL:\n{_call_block(ctx)}\n\n"
f"ACTIVE INCIDENTS ({len(recent)} recent):\n{inc_block}\n"
f"{_RULES}\n"
f"Respond with JSON only (no markdown):\n{_SCHEMA}"
)
def _build_tiebreak_prompt(rules_decision: dict, llm_decision: dict, ctx: dict) -> str:
now = ctx["now"]
recent = ctx["recent"]
inc_block = (
"\n".join(_inc_summary(inc, now) for inc in recent[:20])
if recent else "(none)"
)
def _fmt(d: dict, name: str) -> str:
action = d.get("action", "?")
inc = d.get("matched_incident")
inc_id = inc["incident_id"] if inc else (d.get("incident_id") or "null")
reason = d.get("reasoning") or (d.get("corr_debug") or {}).get("corr_fit_signal") or ""
return f" {name}: action={action}, incident_id={inc_id}, reasoning={reason!r}"
return (
"You are a senior incident correlator for a public safety radio monitoring system.\n\n"
"Two correlation engines disagree. You must make the final decision.\n\n"
f"NEW CALL:\n{_call_block(ctx)}\n\n"
f"ACTIVE INCIDENTS ({len(recent)} recent):\n{inc_block}\n\n"
"DISAGREEMENT:\n"
f"{_fmt(rules_decision, 'Rules engine')}\n"
f"{_fmt(llm_decision, 'LLM correlator')}\n"
f"{_RULES}\n"
f"Respond with JSON only (no markdown):\n{_SCHEMA}"
)
# ─────────────────────────────────────────────────────────────────────────────
# Gemini API call (sync, runs in thread pool)
# ─────────────────────────────────────────────────────────────────────────────
def _sync_gemini(model_name: str, prompt: str) -> dict:
import google.generativeai as genai # lazy import — only when needed
genai.configure(api_key=settings.gemini_api_key)
model = genai.GenerativeModel(
model_name,
generation_config={"response_mime_type": "application/json"},
)
response = model.generate_content(prompt)
return json.loads(response.text)
# ─────────────────────────────────────────────────────────────────────────────
# Decision parsing
# ─────────────────────────────────────────────────────────────────────────────
def _parse_response(raw: dict, ctx: dict) -> dict:
"""
Convert raw Gemini JSON output to a decision dict compatible with _run_decision().
Resolves incident_id → full incident doc from ctx["all_active"].
Handles type inference for "new" actions the same way as the rules engine.
"""
from app.internal.incident_correlator import _infer_type_from_tags # same-package import
action = raw.get("action", "orphan")
reasoning = raw.get("reasoning", "")
if action not in ("link", "new", "orphan"):
action = "orphan"
matched_incident: Optional[dict] = None
if action == "link":
inc_id = raw.get("incident_id")
if inc_id:
matched_incident = next(
(i for i in ctx["all_active"] if i.get("incident_id") == inc_id),
None,
)
if not matched_incident:
logger.warning(
f"LLM correlator: incident_id={inc_id!r} not in active incidents — orphaning"
)
action = "orphan"
incident_type: Optional[str] = None
if action in ("link", "new"):
incident_type = ctx["incident_type"]
if not incident_type:
incident_type = _infer_type_from_tags(ctx["tags"])
if action == "new" and not incident_type:
# Can't create an incident without a type — demote to orphan
action = "orphan"
matched_incident = None
return {
"action": action,
"matched_incident": matched_incident,
"incident_type": incident_type,
"corr_debug": {"corr_llm_reasoning": reasoning},
"reasoning": reasoning,
}
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
async def decide(call_id: str, ctx: dict) -> Optional[dict]:
"""
Run the cheap LLM correlator (corr_cheap_model) on the call.
Returns a decision dict or None if:
- GEMINI_API_KEY is not configured
- the call is thin (content-free — no value from LLM)
- there are no recent active incidents to reason about
- Gemini fails
Callers should treat None as "fall back to rules decision".
"""
if not settings.gemini_api_key:
return None
if ctx["is_thin_call"]:
return None # thin calls have no transcript/units/coords to reason about
if not ctx["recent"]:
return None # no incidents to correlate against — rules handles new-only
try:
prompt = _build_decide_prompt(ctx)
raw = await asyncio.to_thread(_sync_gemini, settings.corr_cheap_model, prompt)
decision = _parse_response(raw, ctx)
_id = (decision["matched_incident"] or {}).get("incident_id", "null")
logger.info(
f"LLM correlator ({settings.corr_cheap_model}): call {call_id}"
f"action={decision['action']} incident={_id} "
f"reasoning={decision['reasoning']!r}"
)
return decision
except Exception as e:
logger.warning(f"LLM correlator failed for call {call_id}: {e}")
return None
async def tiebreak(rules_decision: dict, llm_decision: dict, ctx: dict) -> dict:
"""
Run the smart tiebreaker (corr_smart_model) when rules and LLM disagree.
Falls back to rules_decision on any error.
"""
call_id = ctx["call_id"]
try:
prompt = _build_tiebreak_prompt(rules_decision, llm_decision, ctx)
raw = await asyncio.to_thread(_sync_gemini, settings.corr_smart_model, prompt)
decision = _parse_response(raw, ctx)
_id = (decision["matched_incident"] or {}).get("incident_id", "null")
logger.info(
f"LLM tiebreak ({settings.corr_smart_model}): call {call_id}"
f"action={decision['action']} incident={_id} "
f"reasoning={decision['reasoning']!r}"
)
return decision
except Exception as e:
logger.warning(f"LLM tiebreak failed for call {call_id}: {e} — using rules decision")
return rules_decision
def decisions_agree(rules: dict, llm: dict) -> bool:
"""True if both decisions agree on action and (when action=="link") on the target incident."""
if rules["action"] != llm["action"]:
return False
if rules["action"] == "link":
r_id = (rules.get("matched_incident") or {}).get("incident_id")
l_id = (llm.get("matched_incident") or {}).get("incident_id")
return r_id == l_id
return True
+62 -3
View File
@@ -83,6 +83,65 @@ def _public_url_to_gcs_uri(url: str) -> Optional[str]:
return None
async def _correlate_with_consensus(
call_id: str,
node_id: str,
system_id: Optional[str],
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
tags: list[str],
incident_type: Optional[str],
location: Optional[str],
location_coords: Optional[dict],
units: Optional[list] = None,
vehicles: Optional[list] = None,
cleared_units: Optional[list] = None,
reassignment: bool = False,
) -> Optional[str]:
"""
Consensus correlator: runs the rules engine and the cheap LLM in sequence.
If they agree the rules decision is committed directly.
If they disagree a smarter tiebreaker LLM makes the final call.
Falls back to rules-only when GEMINI_API_KEY is absent, the call is
content-free (thin), or any LLM call fails.
"""
from app.internal import incident_correlator, llm_correlator
preview = await incident_correlator.preview_correlation(
call_id=call_id, node_id=node_id, system_id=system_id,
talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name,
tags=tags, incident_type=incident_type, location=location,
location_coords=location_coords, units=units, vehicles=vehicles,
cleared_units=cleared_units, reassignment=reassignment,
)
ctx = preview["ctx"]
rules_decision = preview["decision"]
llm_decision = await llm_correlator.decide(call_id, ctx)
if llm_decision is None:
# LLM unavailable, skipped (thin call), or errored — rules wins.
rules_decision["corr_debug"]["corr_consensus"] = "rules_only"
return await incident_correlator.apply_correlation(preview)
if llm_correlator.decisions_agree(rules_decision, llm_decision):
rules_decision["corr_debug"]["corr_consensus"] = "agreed"
rules_decision["corr_debug"]["corr_llm_reasoning"] = llm_decision.get("reasoning", "")
return await incident_correlator.apply_correlation(preview)
# Disagree — escalate to the smarter tiebreaker.
logger.info(
f"Consensus disagreement for call {call_id}: "
f"rules={rules_decision['action']} vs llm={llm_decision['action']} — tiebreak"
)
final = await llm_correlator.tiebreak(rules_decision, llm_decision, ctx)
final["corr_debug"]["corr_consensus"] = "tiebreak"
final["corr_debug"]["corr_rules_action"] = rules_decision["action"]
final["corr_debug"]["corr_llm_action"] = llm_decision["action"]
return await incident_correlator.apply_correlation({"decision": final, "ctx": ctx})
async def _run_extraction_pipeline(
call_id: str,
node_id: str,
@@ -114,7 +173,7 @@ async def _run_extraction_pipeline(
# overlap so the new scene doesn't chain into the unit's previous incident.
is_reassignment = bool(scene.get("reassignment"))
corr_units = [] if is_reassignment else scene.get("units")
incident_id = await incident_correlator.correlate_call(
incident_id = await _correlate_with_consensus(
call_id=call_id,
node_id=node_id,
system_id=system_id,
@@ -217,7 +276,7 @@ async def _run_intelligence_pipeline(
all_tags.extend(scene["tags"])
is_reassignment = bool(scene.get("reassignment"))
corr_units = [] if is_reassignment else scene.get("units")
incident_id = await incident_correlator.correlate_call(
incident_id = await _correlate_with_consensus(
call_id=call_id,
node_id=node_id,
system_id=system_id,
@@ -246,7 +305,7 @@ async def _run_intelligence_pipeline(
if not scenes:
_call_doc = await fstore.doc_get("calls", call_id)
if not (_call_doc or {}).get("skip_reason"):
incident_id = await incident_correlator.correlate_call(
incident_id = await _correlate_with_consensus(
call_id=call_id,
node_id=node_id,
system_id=system_id,