Compare commits

..

3 Commits

Author SHA1 Message Date
Logan cbcc85f7b1 Add consensus correlator: rules + Gemini LLM with smart tiebreaker
Refactor incident_correlator.py to a decision/commit split (preview_correlation
/ apply_correlation) so the rules engine and LLM can both produce decisions before
anything is written to Firestore.

Add llm_correlator.py: cheap Gemini Flash first-pass + Gemini Pro tiebreaker.
Wire _correlate_with_consensus in upload.py — rules-only fallback when key is
absent or call is thin; agreed/tiebreak consensus written to corr_debug.
2026-06-01 00:56:11 -04:00
Logan 6bf4333b72 Make correlation conservative: no time_fallback, pursuit-aware proximity, tiered thin path
- Remove time_fallback from _call_fits_incident: a substantive call with no
  matching signals (unit/vehicle/location) is now always orphaned on dispatch
  channels rather than attached by recency alone
- Pursuit-mode location: incidents tagged as vehicle-pursuit/pursuit/chase use
  a 20km expanded radius with speed-sanity validation (distance ÷ elapsed time
  must be ≤ 8 km/min) — location change is a positive signal for moving incidents
- Non-pursuit incidents: strict 0.5km proximity unchanged — location change = reject
- Thin path two-tier: ≤30s → attach to most-recent regardless of candidate count
  (direct conversational reply); 30s–10min → single candidate required
2026-06-01 00:08:19 -04:00
Logan b77d2cce36 Fix over-correlation: geocoding precision, thin path ambiguity, skip_reason propagation
- Geocoding: reject GEOMETRIC_CENTER/APPROXIMATE results — vague location strings
  (regions, city centroids) were resolving to node-area coords and creating false
  proximity matches that merged unrelated incidents
- Thin path: on dispatch channels with multiple active incidents, skip attachment
  rather than guessing — "10-4" with 3 active incidents is genuinely ambiguous
- Short transcripts (≤5 words) now write skip_reason="transcript_too_short" to
  the call doc, matching garbage transcript behavior
- upload.py no-scenes fallback now checks skip_reason before running correlation —
  flagged calls (garbage, too short) no longer attach via thin path
- Update Server README to reflect current project purpose, goals, and pipeline
2026-05-31 23:51:46 -04:00
6 changed files with 822 additions and 294 deletions
+58 -128
View File
@@ -1,10 +1,21 @@
# DRB Server
Full-stack backend for the Discord Radio Bot (DRB) system. Receives telemetry from SDR edge nodes over MQTT, stores data in Firestore, orchestrates Discord voice bots through a token pool, and serves a real-time admin dashboard.
Full-stack backend for the DRB (Distributed Radio Bot) platform — a community-powered, real-time situational awareness system for public safety radio monitoring. Receives telemetry and audio from SDR edge nodes, runs an AI intelligence pipeline on every call, correlates calls into incidents, dispatches alerts, and serves a live operational dashboard.
## What DRB Is
DRB turns radio scanners into a shared geographic intelligence picture. Anyone can contribute coverage by running an edge node (an RTL-SDR dongle on a small Linux machine). All nodes feed into a single platform where operators can:
- **Monitor in real time** — see active incidents on a map as they unfold, with calls grouped by incident as they come in
- **Listen live** — stream audio to Discord voice channels or directly in the browser
- **Get alerted** — receive push notifications when significant events are detected
- **Investigate after the fact** — search transcribed call history, replay audio, and review incident timelines
The intended deployment is a Tactical Operations Center (TOC) display: a map showing active incidents across all covered jurisdictions, with live audio and alert feeds for operators monitoring an area during an event (severe weather, major incident, etc.).
## System Overview
DRB is a distributed SDR (Software Defined Radio) monitoring platform. Edge nodes (small Linux machines with RTL-SDR dongles) decode radio systems and stream audio. The server coordinates those nodes, manages which Discord voice channels receive audio, and stores call history.
DRB is a distributed SDR (Software Defined Radio) monitoring platform. Edge nodes (small Linux machines with RTL-SDR dongles) decode radio systems and stream audio. The server coordinates those nodes, runs the intelligence pipeline on each call, manages Discord voice bot assignments, and serves the operational dashboard.
```
Edge Node (client machine)
@@ -28,9 +39,10 @@ Browser (admin)
| Service | Container | Description | Port |
|---|---|---|---|
| `mosquitto` | `eclipse-mosquitto` | MQTT broker — receives telemetry from edge nodes, dispatches commands | 1883 |
| `c2-core` | Python 3.11 / FastAPI | Command & control API — MQTT subscriber, Firestore writes, node/system/call/token management | 8000 |
| `mosquitto` | `eclipse-mosquitto` | MQTT broker — receives telemetry from edge nodes, dispatches commands | 1883 |
| `c2-core` | Python 3.11 / FastAPI | Command & control API — MQTT handler, intelligence pipeline, incident correlator, alert dispatch, node/system/token management | 8000 |
| `discord-bot` | Python 3.11 / discord.py | Server-side Discord slash command bot — `/join`, `/leave`, `/status`, `/help` | — |
| `frontend` | Node.js / Next.js 14 | Admin web dashboard — real-time node map, call logs, system & token management | 3000 |
| `frontend` | Node.js / Next.js 14 | Operational web dashboard — live map, incident feed, call history, node and system management | 3000 |
## Directory Structure
@@ -49,19 +61,19 @@ Server/
│ │ │ ├── systems.py # Radio system CRUD (P25/DMR/NBFM configs)
│ │ │ ├── calls.py # Call log retrieval (read-only — calls created by MQTT handler)
│ │ │ ├── tokens.py # Discord bot token pool — add/delete/list; assign_token/release_token helpers
│ │ │ ├── upload.py # Audio file upload endpoint (called by edge nodes)
│ │ │ ├── incidents.py # [PLANNED] Incident CRUD, manual link/unlink calls, resolve
│ │ │ └── alerts.py # [PLANNED] Alert rule CRUD
│ │ │ ├── upload.py # Audio file upload endpoint (called by edge nodes); triggers intelligence pipeline
│ │ │ ├── incidents.py # Incident CRUD, manual link/unlink calls, resolve
│ │ │ └── alerts.py # Alert rule CRUD
│ │ └── internal/
│ │ ├── auth.py # Auth — Firebase ID token OR service key (Bearer)
│ │ ├── firestore.py # Async Firestore wrappers (doc_get, doc_set, doc_update, collection_list)
│ │ ├── mqtt_handler.py # MQTT subscriber — call_start/call_end handlers, node checkin/status
│ │ ├── node_sweeper.py # Background task — marks nodes offline after 90s without heartbeat
│ │ ├── storage.py # GCS audio file uploads
│ │ ├── transcription.py # [PLANNED] STT pipeline — Whisper or Google Speech API
│ │ ├── intelligence.py # [PLANNED] Keyword/entity extraction, severity scoring
│ │ ├── incident_correlator.py # [PLANNED] Match calls to incidents or create new ones
│ │ └── alerter.py # [PLANNED] Alert dispatch (Discord webhook, push, etc.)
│ │ ├── transcription.py # STT pipeline — OpenAI Whisper / GPT-4o transcribe
│ │ ├── intelligence.py # GPT scene extraction: tags, incident type, location, units, vehicles; geocoding
│ │ ├── incident_correlator.py # Hybrid correlator — matches calls to incidents or creates new ones
│ │ └── alerter.py # Alert dispatch — evaluates rules, sends notifications
│ ├── scripts/
│ │ └── set_admin.py # CLI to grant/revoke Firebase admin custom claim
│ ├── gcp-key.json # GCP service account key — place here, NOT committed to git
@@ -80,13 +92,13 @@ Server/
└── drb-frontend/ # Next.js 14 admin dashboard
├── app/
│ ├── dashboard/ # Overview: active node grid + live call feed
│ ├── map/ # Leaflet map — node locations + [PLANNED] incident pins
│ ├── calls/ # Call history — talkgroup, duration, audio, transcript (when populated)
│ ├── map/ # Leaflet map — node locations, active incident pins
│ ├── calls/ # Call history — talkgroup, duration, audio playback, transcript
│ ├── nodes/ # Node list + per-node detail (approve, reject, assign system)
│ ├── systems/ # Radio system CRUD (P25 form with talkgroup editor, DMR/NBFM JSON)
│ ├── tokens/ # Discord bot token pool management
│ ├── incidents/ # [PLANNED] Incident list/detail — linked calls, location, resolve
│ ├── alerts/ # [PLANNED] Alert rule configuration
│ ├── incidents/ # Incident list/detail — linked calls, location, summary, resolve
│ ├── alerts/ # Alert rule configuration
│ └── login/ # Firebase Auth login page
├── components/
│ ├── MapView.tsx # react-leaflet map with status-colored markers
@@ -218,138 +230,56 @@ Edge nodes join Discord voice channels using bot tokens managed by the server. A
## Call & Intelligence Pipeline
This is the full intended data lifecycle for every radio call — from raw RF to searchable, cross-referenced intelligence. The data models and Firestore schema are already designed for this; several pipeline stages are stubs awaiting implementation.
### Call lifecycle (current — fully working)
Every radio call flows through a four-stage pipeline triggered when the edge node uploads its recording:
```
Edge node ──► MQTT call_start ──► c2-core creates CallRecord (status: active)
│ talkgroup_id, talkgroup_name, freq,
│ node_id, system_id, started_at
Firestore "calls" collection
Edge node ──► MQTT call_start ──► CallRecord created (active)
Frontend live call feed / map popups
Edge node ──► MQTT call_end ──► c2-core updates CallRecord (status: ended)
│ ended_at, audio_url (GCS link)
Frontend call history / audio playback
```
### Intelligence pipeline (designed — implementation pending)
After a call ends, the following stages should fire in order:
```
CallRecord (ended, audio_url set)
Edge node ──► audio upload ──► GCS storage
[1] TRANSCRIPTION
Speech-to-text on the GCS audio file (Whisper or Google Speech-to-Text)
→ writes CallRecord.transcript
OpenAI Whisper / GPT-4o transcribe
CallRecord.transcript
[2] INTELLIGENCE EXTRACTION
Analyze transcript for:
- Named entities: unit IDs, street addresses, location references
- Keywords / keyword sets: fire/EMS/police/hazmat/pursuit/shots fired/etc.
- Radio codes (10-codes, signals) mapped to plain English
- Severity scoring
→ writes CallRecord.tags[], CallRecord.location (geocoded if address found)
[2] INTELLIGENCE EXTRACTION (GPT-4o-mini)
Scene detection, entity extraction:
tags, incident_type, location, units,
vehicles, severity, resolved flag
+ geocoding (Google Maps)
+ embedding (text-embedding-3-small)
→ CallRecord.tags, .location, .units, etc.
[3] INCIDENT CORRELATION
Given the extracted entities + tags, decide:
- Does this call match an existing active IncidentRecord?
(same location ± radius, overlapping tags, recent time window)
→ link: append CallRecord.call_id to IncidentRecord.call_ids,
set CallRecord.incident_id, update IncidentRecord.summary
- Or does this call describe a new event?
→ create IncidentRecord (type, location, title, tags, status: active)
link the call, set CallRecord.incident_id
[3] INCIDENT CORRELATION (hybrid engine)
Fast path — talkgroup + recency
Unit path — same officer continuity
Location — proximity match
Cross-TG — multi-agency / channel hop
Slow path — embedding similarity
→ IncidentRecord created or updated
[4] ALERTS
If incident is new OR severity exceeds threshold:
- Trigger configured alert channels (Discord webhook, push notification, etc.)
- Include: incident type, location, talkgroup, transcript excerpt
[4] ALERT DISPATCH
Evaluate alert rules (keywords, talkgroups)
notifications sent
```
### Data model fields involved
**`CallRecord`** (Firestore collection: `calls`):
| Field | Type | Populated by |
|---|---|---|
| `call_id` | string | MQTT handler on call_start |
| `node_id` | string | MQTT handler |
| `system_id` | string | MQTT handler (from node's assigned system) |
| `talkgroup_id` | number | MQTT handler |
| `talkgroup_name` | string | MQTT handler |
| `freq` | number | MQTT handler |
| `started_at` | timestamp | MQTT handler |
| `ended_at` | timestamp | MQTT handler on call_end |
| `status` | `active` \| `ended` | MQTT handler |
| `audio_url` | string | Edge node upload → GCS |
| `transcript` | string \| null | **[stub]** Transcription pipeline |
| `incident_id` | string \| null | **[stub]** Incident correlation |
| `location` | `{lat, lng}` \| null | **[stub]** Geocoding from transcript |
| `tags` | string[] | **[stub]** Intelligence extraction |
**`IncidentRecord`** (Firestore collection: `incidents`):
| Field | Type | Description |
|---|---|---|
| `incident_id` | string | UUID |
| `title` | string | Auto-generated or manually set |
| `type` | string | `fire`, `ems`, `police`, `hazmat`, `pursuit`, etc. |
| `status` | `active` \| `resolved` | Updated as calls accumulate or manually resolved |
| `location` | `{lat, lng}` | Geocoded from first call with a location |
| `call_ids` | string[] | All linked CallRecord IDs |
| `started_at` | timestamp | Timestamp of first linked call |
| `updated_at` | timestamp | Updated on each new linked call |
| `summary` | string \| null | Auto-generated from transcripts or manually written |
| `tags` | string[] | Union of tags from all linked calls |
### Backend — what needs to be built
The following do **not exist yet** and need to be created:
- `drb-c2-core/app/routers/incidents.py` — CRUD + manual incident management
- `drb-c2-core/app/routers/alerts.py` — Alert rule configuration
- `drb-c2-core/app/internal/transcription.py` — STT integration (Whisper local or Google Speech API)
- `drb-c2-core/app/internal/intelligence.py` — Keyword/entity extraction, severity scoring
- `drb-c2-core/app/internal/incident_correlator.py` — Match calls to incidents or create new ones
- `drb-c2-core/app/internal/alerter.py` — Dispatch alerts (Discord webhook, etc.)
The `_on_call_end()` handler in `mqtt_handler.py` is the natural trigger point — after updating the CallRecord, it should enqueue the transcription + intelligence pipeline.
### Frontend — what needs to be built
The following pages and nav items are not yet implemented:
- `/incidents` — Incident list and detail view (linked calls, map pin, transcript summary, tags, resolve button)
- `/alerts` — Alert rule configuration (keyword sets, talkgroup filters, notification channels)
- `Nav.tsx` should add **Calls** and **Incidents** as primary nav items; the current Calls page exists but isn't in the design's intended nav hierarchy
The **Map** page should eventually show both node markers and incident pins — incidents with `location` set should appear as color-coded markers (by type/severity) alongside the node status markers.
## Frontend Pages
| Page | URL | Status | Description |
|---|---|---|---|
| Dashboard | `/dashboard` | Working | Live node grid + active call stream |
| Map | `/map` | Working | Leaflet map — nodes color-coded by status, active call popups |
| Calls | `/calls` | Working | Full call history — talkgroup, duration, audio playback, transcript (when populated) |
| Nodes | `/nodes` | Working | Node list; per-node detail for approve/reject/assign system |
| Systems | `/systems` | Working | Create and manage P25/DMR/NBFM radio system configurations |
| Tokens | `/tokens` | Working | Discord bot token pool management |
| Incidents | `/incidents` | **Not built** | Incident list/detail — linked calls, location, summary, tags, resolve |
| Page | URL | Description |
|---|---|---|
| Dashboard | `/dashboard` | Live node grid + active call stream |
| Map | `/map` | Leaflet map — nodes color-coded by status, active call popups |
| Calls | `/calls` | Full call history — talkgroup, duration, audio playback, transcript |
| Nodes | `/nodes` | Node list; per-node detail for approve/reject/assign system |
| Systems | `/systems` | Create and manage P25/DMR/NBFM radio system configurations |
| Tokens | `/tokens` | Discord bot token pool management |
| Incidents | `/incidents` | Incident list/detail — linked calls, location, summary, tags, resolve |
| Alerts | `/alerts` | **Not built** | Alert rule configuration — keywords, talkgroups, notification channels |
## Makefile Targets
+5
View File
@@ -26,6 +26,11 @@ class Settings(BaseSettings):
# Gemini (intelligence extraction, embeddings, incident summaries)
gemini_api_key: Optional[str] = None
# Correlation consensus models
# corr_cheap_model — first-pass LLM correlator (runs on every call)
# corr_smart_model — tiebreaker (only fires when rules and cheap LLM disagree)
corr_cheap_model: str = "gemini-2.0-flash"
corr_smart_model: str = "gemini-1.5-pro"
summary_interval_minutes: int = 2 # how often the summary loop runs
correlation_window_hours: int = 2 # slow/location path: max hours since last call
embedding_similarity_threshold: float = 0.93 # slow-path: requires location corroboration
+314 -78
View File
@@ -46,6 +46,18 @@ from app.internal.logger import logger
from app.internal import firestore as fstore
from app.config import settings
_PURSUIT_TAGS = frozenset({
"vehicle-pursuit", "pursuit", "foot-pursuit", "chase",
"fleeing-vehicle", "suspect-vehicle", "eluding",
})
# Maximum plausible ground speed for a moving incident (pursuit/transport).
# ~3 miles/min ≈ 180 mph — well above real pursuit speeds, but generous enough
# to tolerate GPS drift and call-timing jitter. Anything faster is a bad geocode
# or an unrelated call being falsely proximity-matched.
_MAX_PURSUIT_SPEED_KM_PER_MIN = 8.0 # ~300 km/h, intentionally generous
_PURSUIT_PROXIMITY_KM = 20.0 # expanded radius for moving incidents
_DISPATCH_TG_RE = re.compile(
r"\bdispatch\b|\bdisp\b"
r"|\bpatched\b" # patched channels aggregate multiple call streams
@@ -160,7 +172,7 @@ def _incident_idle_minutes(inc: dict, now: datetime) -> float:
# ─────────────────────────────────────────────────────────────────────────────
# Public entry point
# Public API
# ─────────────────────────────────────────────────────────────────────────────
async def correlate_call(
@@ -182,47 +194,155 @@ async def correlate_call(
) -> Optional[str]:
"""
Link call_id to an existing incident or create a new one.
reference_time — time anchor for the time-limited paths (location + slow).
Defaults to now. Pass call.started_at when re-correlating
orphaned calls so the window is anchored to when the call
actually happened, not when the sweep runs.
create_if_new — when False, skip new-incident creation (re-correlation only
links to existing incidents; it never creates new ones).
Returns the incident_id, or None if skipped.
Thin wrapper: builds context → runs rules decision → commits.
"""
ctx = await _build_context(
call_id=call_id, units=units, vehicles=vehicles, cleared_units=cleared_units,
location_coords=location_coords, reference_time=reference_time,
system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name,
tags=tags, incident_type=incident_type, location=location,
reassignment=reassignment, create_if_new=create_if_new,
)
decision = _run_decision(ctx)
return await _apply_and_log(decision, ctx)
async def preview_correlation(
call_id: str,
node_id: str,
system_id: Optional[str],
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
tags: list[str],
incident_type: Optional[str],
location: Optional[str] = None,
location_coords: Optional[dict] = None,
reference_time: Optional[datetime] = None,
create_if_new: bool = True,
units: Optional[list[str]] = None,
vehicles: Optional[list[str]] = None,
cleared_units: Optional[list[str]] = None,
reassignment: bool = False,
) -> dict:
"""
Run the rules engine and return the decision WITHOUT committing to Firestore.
Returns {"decision": {...}, "ctx": {...}}.
Pass the result to apply_correlation() to commit, or use the decision
fields directly for comparison in the consensus correlator.
decision keys:
action "link" | "new" | "orphan"
matched_incident the candidate incident doc (action == "link")
incident_type resolved type after tag inference (action == "new")
corr_debug fields to persist on the call doc
"""
ctx = await _build_context(
call_id=call_id, units=units, vehicles=vehicles, cleared_units=cleared_units,
location_coords=location_coords, reference_time=reference_time,
system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name,
tags=tags, incident_type=incident_type, location=location,
reassignment=reassignment, create_if_new=create_if_new,
)
decision = _run_decision(ctx)
return {"decision": decision, "ctx": ctx}
async def apply_correlation(preview: dict) -> Optional[str]:
"""
Commit the decision returned by preview_correlation() to Firestore.
Returns the incident_id, or None if the call was orphaned.
"""
return await _apply_and_log(preview["decision"], preview["ctx"])
# ─────────────────────────────────────────────────────────────────────────────
# Context builder — fetches all Firestore data needed for correlation
# ─────────────────────────────────────────────────────────────────────────────
async def _build_context(
call_id: str,
units: Optional[list[str]],
vehicles: Optional[list[str]],
cleared_units: Optional[list[str]],
location_coords: Optional[dict],
reference_time: Optional[datetime],
system_id: Optional[str],
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
tags: list[str],
incident_type: Optional[str],
location: Optional[str],
reassignment: bool,
create_if_new: bool,
) -> dict:
now = reference_time or datetime.now(timezone.utc)
window = timedelta(hours=settings.correlation_window_hours)
# Fetch all active incidents cross-type (mutual aid needs this)
all_active = await fstore.collection_list("incidents", status="active")
recent = [inc for inc in all_active if _within_window_of(inc, now, window)]
# Fetch call doc once — reused for disambiguation, embedding merge, unit accumulation
call_doc = await fstore.doc_get("calls", call_id) or {}
call_embedding: Optional[list] = call_doc.get("embedding")
# Prefer explicitly passed units/vehicles (per-scene, from intelligence extraction)
# over the call doc, which merges units from ALL scenes in a multi-scene recording.
# Falling back to the call doc is correct for recorrelation sweeps where we have no
# scene-level breakdown.
call_units: list[str] = units if units is not None else (call_doc.get("units") or [])
call_vehicles: list[str] = vehicles if vehicles is not None else (call_doc.get("vehicles") or [])
call_cleared: list[str] = cleared_units if cleared_units is not None else (call_doc.get("cleared_units") or [])
call_severity: str = call_doc.get("severity") or "unknown"
# Use passed coords first (freshly geocoded), fall back to what's on the call doc
coords: Optional[dict] = location_coords or call_doc.get("location_coords")
call_embedding = call_doc.get("embedding")
call_units = units if units is not None else (call_doc.get("units") or [])
call_vehicles = vehicles if vehicles is not None else (call_doc.get("vehicles") or [])
call_cleared = cleared_units if cleared_units is not None else (call_doc.get("cleared_units") or [])
call_severity = call_doc.get("severity") or "unknown"
coords = location_coords or call_doc.get("location_coords")
is_thin_call = not call_units and not call_vehicles and not coords
return {
"call_id": call_id, "all_active": all_active, "recent": recent,
"call_doc": call_doc, "call_embedding": call_embedding,
"call_units": call_units, "call_vehicles": call_vehicles,
"call_cleared": call_cleared, "call_severity": call_severity,
"coords": coords, "is_thin_call": is_thin_call, "now": now,
"system_id": system_id, "talkgroup_id": talkgroup_id,
"talkgroup_name": talkgroup_name, "tags": tags,
"incident_type": incident_type, "location": location,
"location_coords": location_coords, "reassignment": reassignment,
"create_if_new": create_if_new,
}
# ─────────────────────────────────────────────────────────────────────────────
# Rules decision engine — pure logic, no Firestore writes
# ─────────────────────────────────────────────────────────────────────────────
def _run_decision(ctx: dict) -> dict:
"""
Run the hybrid rules correlation engine against a pre-built context.
No Firestore reads or writes — all data comes from ctx.
Returns:
action "link" | "new" | "orphan"
matched_incident incident doc to update (action == "link")
incident_type resolved type (action == "new")
corr_debug fields to write to the call doc
"""
all_active = ctx["all_active"]
recent = ctx["recent"]
call_doc = ctx["call_doc"]
call_embedding = ctx["call_embedding"]
call_units = ctx["call_units"]
call_vehicles = ctx["call_vehicles"]
coords = ctx["coords"]
is_thin_call = ctx["is_thin_call"]
now = ctx["now"]
system_id = ctx["system_id"]
talkgroup_id = ctx["talkgroup_id"]
talkgroup_name = ctx["talkgroup_name"]
tags = ctx["tags"]
incident_type = ctx["incident_type"]
location = ctx["location"]
location_coords= ctx["location_coords"]
reassignment = ctx["reassignment"]
create_if_new = ctx["create_if_new"]
call_id = ctx["call_id"]
matched_incident: Optional[dict] = None
corr_debug: dict = {}
# A "thin" call carries no scene-identifying information — it is a pure
# status transmission (10-4, en route, acknowledgement). Detected by the
# absence of extracted units, vehicles, AND geocoded coordinates. Thin
# calls should link to wherever the last active conversation on this TGID
# was happening rather than running the full scene-verification logic.
is_thin_call = not call_units and not call_vehicles and not coords
# ── 1. Fast path: talkgroup match with recency gate ──────────────────────
#
# Only considers incidents updated within tg_fast_path_idle_minutes (default 30 min).
@@ -263,22 +383,47 @@ async def correlate_call(
)
if tg_recent and is_thin_call:
# Status/ack call — no scene data to reason about.
# On dispatch channels (shared backbone), apply a much tighter idle gate so
# a "10-4" or "Dispatch." doesn't re-activate an incident that's been quiet
# for an hour and then absorb the next unrelated dispatch on the same TGID.
# Content-free status calls ("10-4", "Copy", "En route") — two tiers:
#
# Tier 1 — ≤30 seconds idle: this is a direct conversational reply to
# whatever was just transmitted. Attach to the most recently updated
# incident regardless of how many are active; within 30 seconds, the
# "most recently updated" IS the active thread.
#
# Tier 2 — 30 seconds to tg_dispatch_thin_idle_minutes: channel context
# is less clear. Only attach when there is exactly ONE candidate to
# avoid guessing on a busy multi-incident channel.
if is_dispatch:
THIN_CONVERSATIONAL_SECS = 30
very_recent = [
inc for inc in tg_recent
if _incident_idle_minutes(inc, now) * 60 <= THIN_CONVERSATIONAL_SECS
]
if very_recent:
# Tier 1: direct conversational reply — most recent wins.
thin_pool = [max(very_recent, key=lambda inc: inc.get("updated_at", ""))]
logger.info(
f"Correlator fast-path thin (tier-1, ≤{THIN_CONVERSATIONAL_SECS}s): "
f"using most-recent of {len(very_recent)} candidate(s) for call {call_id}"
)
else:
# Tier 2: less certain — require single candidate.
thin_pool = [
inc for inc in tg_recent
if _incident_idle_minutes(inc, now) <= settings.tg_dispatch_thin_idle_minutes
]
if len(thin_pool) > 1:
logger.info(
f"Correlator fast-path thin (tier-2): {len(thin_pool)} active incidents "
f"on dispatch channel — ambiguous, skipping thin call {call_id}"
)
thin_pool = []
else:
thin_pool = tg_recent
if not thin_pool:
logger.info(
f"Correlator fast-path thin: dispatch channel idle > "
f"{settings.tg_dispatch_thin_idle_minutes}min, skipping thin call {call_id}"
f"Correlator fast-path thin: no suitable candidate for call {call_id}"
)
else:
# Attach to whichever pool incident was most recently active on this TGID.
@@ -438,12 +583,24 @@ async def correlate_call(
coords["lat"], coords["lng"],
inc_coords["lat"], inc_coords["lng"],
)
if dist_km <= settings.location_proximity_km:
inc_tags_set = set(inc.get("tags") or [])
is_pursuit_inc = bool(inc_tags_set & _PURSUIT_TAGS)
radius = _PURSUIT_PROXIMITY_KM if is_pursuit_inc else settings.location_proximity_km
# For pursuit incidents, additionally validate movement speed.
if is_pursuit_inc and dist_km > settings.location_proximity_km:
elapsed_min = max(_incident_idle_minutes(inc, now), 0.1)
if (dist_km / elapsed_min) > _MAX_PURSUIT_SPEED_KM_PER_MIN:
continue # implausible speed — skip this candidate
if dist_km <= radius:
matched_incident = inc
corr_debug = {"corr_path": "location", "corr_distance_km": round(dist_km, 3)}
corr_debug = {
"corr_path": "location",
"corr_distance_km": round(dist_km, 3),
"corr_pursuit_mode": is_pursuit_inc,
}
logger.info(
f"Correlator location-path: call {call_id}{inc['incident_id']} "
f"(dist={dist_km:.2f}km)"
f"(dist={dist_km:.2f}km, pursuit={is_pursuit_inc})"
)
break
@@ -540,30 +697,91 @@ async def correlate_call(
f"call {call_id}{best_inc['incident_id']} (sim={best_score:.3f})"
)
# ── Update existing or create new ────────────────────────────────────────
# ── Decision output ───────────────────────────────────────────────────────
if matched_incident:
incident_id = matched_incident["incident_id"]
return {
"action": "link",
"matched_incident": matched_incident,
"incident_type": incident_type,
"corr_debug": corr_debug,
}
if not create_if_new:
return {"action": "orphan", "matched_incident": None, "incident_type": None, "corr_debug": corr_debug}
# Attempt type inference from tags before giving up on creation
resolved_type = incident_type
if not resolved_type and tags:
resolved_type = _infer_type_from_tags(tags)
if resolved_type:
logger.info(
f"Correlator: inferred incident_type={resolved_type!r} from tags {tags} for call {call_id}"
)
if not resolved_type:
return {"action": "orphan", "matched_incident": None, "incident_type": None, "corr_debug": corr_debug}
return {
"action": "new",
"matched_incident": None,
"incident_type": resolved_type,
"corr_debug": corr_debug,
}
# ─────────────────────────────────────────────────────────────────────────────
# Commit layer — writes decisions to Firestore
# ─────────────────────────────────────────────────────────────────────────────
async def _apply_and_log(decision: dict, ctx: dict) -> Optional[str]:
"""Commit a rules decision and persist the corr_debug fields to the call doc."""
incident_id = await _apply_decision(decision, ctx)
corr_debug = decision.get("corr_debug") or {}
if corr_debug:
try:
await fstore.doc_set("calls", ctx["call_id"], corr_debug)
except Exception as e:
logger.warning(f"Could not write corr_debug for call {ctx['call_id']}: {e}")
return incident_id
async def _apply_decision(decision: dict, ctx: dict) -> Optional[str]:
"""
Execute a correlation decision produced by _run_decision().
Handles all Firestore writes; returns the incident_id or None.
"""
action = decision["action"]
if action == "orphan":
return None
call_id = ctx["call_id"]
talkgroup_id = ctx["talkgroup_id"]
talkgroup_name = ctx["talkgroup_name"]
system_id = ctx["system_id"]
tags = ctx["tags"]
location = ctx["location"]
location_coords = ctx["location_coords"]
call_units = ctx["call_units"]
call_vehicles = ctx["call_vehicles"]
call_embedding = ctx["call_embedding"]
call_severity = ctx["call_severity"]
call_cleared = ctx["call_cleared"]
coords = ctx["coords"]
now = ctx["now"]
incident_type = decision["incident_type"]
if action == "link":
matched_incident = decision["matched_incident"]
await _update_incident(
matched_incident, call_id, talkgroup_id, system_id, tags,
location, location_coords, call_units, call_vehicles, call_embedding, now,
talkgroup_name=talkgroup_name, incident_type=incident_type,
cleared_units=call_cleared,
)
elif create_if_new:
# If GPT returned no type (missing talkgroup context is common), attempt
# to recover a type from the extracted tags before giving up on creation.
if not incident_type and tags:
incident_type = _infer_type_from_tags(tags)
if incident_type:
logger.info(
f"Correlator: inferred incident_type={incident_type!r} from tags "
f"{tags} for call {call_id} (no GPT type)"
)
if not incident_type:
# No type and none inferred — nothing to create
return None
return matched_incident["incident_id"]
# ── Cross-system parent detection ─────────────────────────────────────
# action == "new"
# ── Cross-system parent detection ─────────────────────────────────────────
# Before creating a standalone incident, check whether this call belongs
# to an incident already opened by a different agency (multi-agency chase,
# mutual aid, etc.). If a parent candidate is found:
@@ -578,7 +796,7 @@ async def correlate_call(
location=location,
location_coords=coords,
call_embedding=call_embedding,
recent=recent,
recent=ctx["recent"],
)
if cross_parent:
@@ -596,13 +814,13 @@ async def correlate_call(
# Candidate is already a child — link new child to the existing master
await _demote_to_child(incident_id, existing_master_id)
await _add_child_to_master(existing_master_id, incident_id, now)
corr_debug["corr_path"] = "new/cross-system-child"
decision["corr_debug"]["corr_path"] = "new/cross-system-child"
logger.info(
f"Correlator cross-system: call {call_id} → new child {incident_id} "
f"under existing master {existing_master_id}"
)
else:
# Candidate is a standalone master — create master shell, demote both
# Candidate is a standalone — create master shell, demote both
master_id = await _create_master_incident(
first_child_id=existing_child_id,
second_child_id=incident_id,
@@ -613,7 +831,7 @@ async def correlate_call(
)
await _demote_to_child(existing_child_id, master_id)
await _demote_to_child(incident_id, master_id)
corr_debug["corr_path"] = "new/cross-system-master"
decision["corr_debug"]["corr_path"] = "new/cross-system-master"
logger.info(
f"Correlator cross-system: created master {master_id}, "
f"demoted {existing_child_id} + new {incident_id} as children"
@@ -625,18 +843,7 @@ async def correlate_call(
tags, location, location_coords,
call_units, call_vehicles, call_embedding, call_severity, now,
)
corr_debug["corr_path"] = "new"
else:
# Creation suppressed (re-correlation sweep) — nothing to do
return None
# Persist the correlation decision to the call document so it can be
# inspected in Firestore or the admin UI without log-scraping.
if corr_debug:
try:
await fstore.doc_set("calls", call_id, corr_debug)
except Exception as e:
logger.warning(f"Could not write corr_debug for call {call_id}: {e}")
decision["corr_debug"]["corr_path"] = "new"
return incident_id
@@ -846,12 +1053,42 @@ def _call_fits_incident(
return True, "vehicle_overlap"
# ── 3. Location proximity ─────────────────────────────────────────────────
# For pursuit-type incidents, a location CHANGE is expected — the suspect is
# moving. Use an expanded radius and validate with a speed-sanity check so
# we don't absorb a call from a genuinely different scene.
# For all other incident types, a location mismatch means a different scene.
inc_coords = inc.get("location_coords")
if call_coords and inc_coords:
dist_km = _haversine_km(
call_coords["lat"], call_coords["lng"],
inc_coords["lat"], inc_coords["lng"],
)
inc_tags = set(inc.get("tags") or [])
is_pursuit = bool(inc_tags & _PURSUIT_TAGS)
if is_pursuit:
# Speed sanity: distance travelled ÷ time since last update must be
# plausible for a vehicle. Protects against a distant unrelated call
# that happens to share pursuit tags accidentally matching.
speed_ok = False
if now is not None and dist_km > proximity_km:
elapsed_min = max(idle_min, 0.1) # avoid div-by-zero
speed_km_per_min = dist_km / elapsed_min
speed_ok = speed_km_per_min <= _MAX_PURSUIT_SPEED_KM_PER_MIN
logger.info(
f" fits[{inc_id}]: pursuit speed check — "
f"dist={dist_km:.2f}km elapsed={elapsed_min:.1f}min "
f"speed={speed_km_per_min:.1f}km/min ok={speed_ok}"
)
effective_radius = _PURSUIT_PROXIMITY_KM if is_pursuit else proximity_km
if dist_km <= proximity_km or (is_pursuit and speed_ok and dist_km <= effective_radius):
logger.info(f" fits[{inc_id}]: location_proximity dist={dist_km:.2f}km (pursuit={is_pursuit}) → location_proximity")
return True, "location_proximity"
if is_pursuit:
logger.info(f" fits[{inc_id}]: pursuit location rejected — dist={dist_km:.2f}km exceeds radius or speed sanity failed → location_conflict")
else:
logger.info(f" fits[{inc_id}]: location_conflict dist={dist_km:.2f}km → location_conflict")
return False, "location_conflict"
else:
if dist_km <= proximity_km:
logger.info(f" fits[{inc_id}]: location_proximity dist={dist_km:.2f}km → location_proximity")
return True, "location_proximity"
@@ -867,13 +1104,12 @@ def _call_fits_incident(
f"call_coords={call_coords is not None} inc_coords={inc_coords is not None}"
)
if is_dispatch:
# Conversational continuity: the call arrived during the same conversation
# thread (< 2 min since last incident activity) with no contradicting evidence.
# Suppressed for reassignment calls — unit is breaking to a new scene and
# should not chain back to the current incident even if very recent.
if idle_min < 2 and not reassignment:
return True, "time_fallback"
# Shared dispatch channel — do not link without at least one positive signal.
# Dispatch channels require at least one positive signal (unit, vehicle,
# or location match). A substantive call with no matching signals is more
# likely a separate incident than a follow-up to the current one — two
# dispatches can arrive within seconds of each other on a busy channel.
# Content-free thin calls are handled before this function via the thin
# path in correlate_call, with a tighter 30-second recency window.
return False, "no_signal"
# Tactical channel: one scene per channel.
+18 -2
View File
@@ -171,6 +171,10 @@ async def extract_scenes(
f"Intelligence: call {call_id} — transcript too short for extraction "
f"({len(transcript.split())} words), skipping"
)
try:
await fstore.doc_set("calls", call_id, {"skip_reason": "transcript_too_short"})
except Exception:
pass
return []
raw_scenes: list[dict] = await asyncio.to_thread(
@@ -382,7 +386,19 @@ async def _geocode_location(
data = r.json()
if data.get("status") != "OK" or not data.get("results"):
return None
loc = data["results"][0]["geometry"]["location"]
result = data["results"][0]
location_type = result.get("geometry", {}).get("location_type", "")
# Only accept address-level precision. GEOMETRIC_CENTER (city/neighborhood
# centroid) and APPROXIMATE (region boundary) produce coordinates that look
# valid but are too vague for 0.5km proximity matching — they often resolve
# to the same point as the node's position and create false proximity matches.
if location_type not in ("ROOFTOP", "RANGE_INTERPOLATED"):
logger.info(
f"Geocoding rejected '{location_str}' — imprecise result "
f"(location_type={location_type!r}), returning None"
)
return None
loc = result["geometry"]["location"]
lat, lng = float(loc["lat"]), float(loc["lng"])
dist_km = _geo_dist_km(node_lat, node_lon, lat, lng)
if dist_km > settings.geocode_max_km:
@@ -392,7 +408,7 @@ async def _geocode_location(
)
return None
coords = {"lat": lat, "lng": lng}
logger.info(f"Geocoded '{location_str}'{coords} ({dist_km:.1f}km from node)")
logger.info(f"Geocoded '{location_str}'{coords} ({dist_km:.1f}km from node) [{location_type}]")
return coords
except Exception as e:
logger.warning(f"Geocoding failed for '{location_str}': {e}")
+278
View File
@@ -0,0 +1,278 @@
"""
LLM-based incident correlator using Gemini.
Two functions are exposed:
decide(call_id, ctx) — cheap first-pass (corr_cheap_model)
tiebreak(rules_decision, llm_decision, ctx) — smart tiebreaker (corr_smart_model)
Both return a decision dict compatible with _run_decision() in incident_correlator:
{"action": "link"|"new"|"orphan",
"matched_incident": dict|None,
"incident_type": str|None,
"corr_debug": dict,
"reasoning": str} ← extra field for logging/tiebreak comparison
decide() is skipped for thin calls (no content to reason about) and when
GEMINI_API_KEY is not set — in those cases returns None so the caller knows
to fall back to the rules decision.
Error handling: any Gemini failure returns None from decide() and the
rules_decision from tiebreak() so the pipeline never stalls.
"""
import asyncio
import json
from datetime import datetime, timezone
from typing import Optional
from app.internal.logger import logger
from app.config import settings
# ─────────────────────────────────────────────────────────────────────────────
# Prompt helpers
# ─────────────────────────────────────────────────────────────────────────────
def _fmt_idle(inc: dict, now: datetime) -> str:
try:
raw = inc.get("updated_at") or inc.get("started_at") or ""
dt = datetime.fromisoformat(str(raw).replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
minutes = int((now - dt).total_seconds() / 60)
return f"{minutes}min ago" if minutes < 60 else f"{minutes // 60}h{minutes % 60:02d}m ago"
except Exception:
return "?"
def _inc_summary(inc: dict, now: datetime) -> str:
parts = [f"id:{inc['incident_id']}", f"type:{inc.get('type') or '?'}"]
if inc.get("location"):
parts.append(f"loc:{inc['location']}")
units = inc.get("units") or []
if units:
parts.append(f"units:[{', '.join(units[:6])}]")
tags = inc.get("tags") or []
if tags:
parts.append(f"tags:[{', '.join(tags[:4])}]")
parts.append(f"idle:{_fmt_idle(inc, now)}")
return " | ".join(parts)
def _call_block(ctx: dict) -> str:
lines = []
call_doc = ctx["call_doc"]
transcript = call_doc.get("transcript_corrected") or call_doc.get("transcript")
if transcript:
lines.append(f"Transcript: {transcript[:700]}")
if ctx["tags"]:
lines.append(f"Tags: {ctx['tags']}")
if ctx["incident_type"]:
lines.append(f"Incident type: {ctx['incident_type']}")
if ctx["location"]:
lines.append(f"Location: {ctx['location']}")
if ctx["call_units"]:
lines.append(f"Units: {ctx['call_units']}")
if ctx["call_vehicles"]:
lines.append(f"Vehicles: {ctx['call_vehicles']}")
if ctx["talkgroup_name"]:
lines.append(f"Talkgroup: {ctx['talkgroup_name']}")
return "\n".join(lines) if lines else "(no details)"
_SCHEMA = '{"action": "link" | "new" | "orphan", "incident_id": "<id_string or null>", "reasoning": "<one sentence>"}'
_RULES = """
Rules:
- "link" only with clear positive evidence: same units, same geocoded location, or semantically identical scene on the same talkgroup within the last few minutes.
- A call on a DIFFERENT talkgroup than an incident requires unit overlap or geocoded location match — topic similarity alone is not enough.
- "new" only if the call has a clear incident_type AND describes a distinct, identifiable scene.
- "orphan" when in doubt — conservative is always correct.
- Do NOT link just because both calls involve police or both mention a road.
"""
def _build_decide_prompt(ctx: dict) -> str:
now = ctx["now"]
recent = ctx["recent"]
inc_block = (
"\n".join(_inc_summary(inc, now) for inc in recent[:20])
if recent else "(none)"
)
return (
"You are an incident correlator for a public safety radio monitoring system.\n\n"
"A new radio call has arrived. Decide whether it belongs to an existing active incident, "
"represents a new incident, or should be orphaned (not enough information).\n\n"
f"NEW CALL:\n{_call_block(ctx)}\n\n"
f"ACTIVE INCIDENTS ({len(recent)} recent):\n{inc_block}\n"
f"{_RULES}\n"
f"Respond with JSON only (no markdown):\n{_SCHEMA}"
)
def _build_tiebreak_prompt(rules_decision: dict, llm_decision: dict, ctx: dict) -> str:
now = ctx["now"]
recent = ctx["recent"]
inc_block = (
"\n".join(_inc_summary(inc, now) for inc in recent[:20])
if recent else "(none)"
)
def _fmt(d: dict, name: str) -> str:
action = d.get("action", "?")
inc = d.get("matched_incident")
inc_id = inc["incident_id"] if inc else (d.get("incident_id") or "null")
reason = d.get("reasoning") or (d.get("corr_debug") or {}).get("corr_fit_signal") or ""
return f" {name}: action={action}, incident_id={inc_id}, reasoning={reason!r}"
return (
"You are a senior incident correlator for a public safety radio monitoring system.\n\n"
"Two correlation engines disagree. You must make the final decision.\n\n"
f"NEW CALL:\n{_call_block(ctx)}\n\n"
f"ACTIVE INCIDENTS ({len(recent)} recent):\n{inc_block}\n\n"
"DISAGREEMENT:\n"
f"{_fmt(rules_decision, 'Rules engine')}\n"
f"{_fmt(llm_decision, 'LLM correlator')}\n"
f"{_RULES}\n"
f"Respond with JSON only (no markdown):\n{_SCHEMA}"
)
# ─────────────────────────────────────────────────────────────────────────────
# Gemini API call (sync, runs in thread pool)
# ─────────────────────────────────────────────────────────────────────────────
def _sync_gemini(model_name: str, prompt: str) -> dict:
import google.generativeai as genai # lazy import — only when needed
genai.configure(api_key=settings.gemini_api_key)
model = genai.GenerativeModel(
model_name,
generation_config={"response_mime_type": "application/json"},
)
response = model.generate_content(prompt)
return json.loads(response.text)
# ─────────────────────────────────────────────────────────────────────────────
# Decision parsing
# ─────────────────────────────────────────────────────────────────────────────
def _parse_response(raw: dict, ctx: dict) -> dict:
"""
Convert raw Gemini JSON output to a decision dict compatible with _run_decision().
Resolves incident_id → full incident doc from ctx["all_active"].
Handles type inference for "new" actions the same way as the rules engine.
"""
from app.internal.incident_correlator import _infer_type_from_tags # same-package import
action = raw.get("action", "orphan")
reasoning = raw.get("reasoning", "")
if action not in ("link", "new", "orphan"):
action = "orphan"
matched_incident: Optional[dict] = None
if action == "link":
inc_id = raw.get("incident_id")
if inc_id:
matched_incident = next(
(i for i in ctx["all_active"] if i.get("incident_id") == inc_id),
None,
)
if not matched_incident:
logger.warning(
f"LLM correlator: incident_id={inc_id!r} not in active incidents — orphaning"
)
action = "orphan"
incident_type: Optional[str] = None
if action in ("link", "new"):
incident_type = ctx["incident_type"]
if not incident_type:
incident_type = _infer_type_from_tags(ctx["tags"])
if action == "new" and not incident_type:
# Can't create an incident without a type — demote to orphan
action = "orphan"
matched_incident = None
return {
"action": action,
"matched_incident": matched_incident,
"incident_type": incident_type,
"corr_debug": {"corr_llm_reasoning": reasoning},
"reasoning": reasoning,
}
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
async def decide(call_id: str, ctx: dict) -> Optional[dict]:
"""
Run the cheap LLM correlator (corr_cheap_model) on the call.
Returns a decision dict or None if:
- GEMINI_API_KEY is not configured
- the call is thin (content-free — no value from LLM)
- there are no recent active incidents to reason about
- Gemini fails
Callers should treat None as "fall back to rules decision".
"""
if not settings.gemini_api_key:
return None
if ctx["is_thin_call"]:
return None # thin calls have no transcript/units/coords to reason about
if not ctx["recent"]:
return None # no incidents to correlate against — rules handles new-only
try:
prompt = _build_decide_prompt(ctx)
raw = await asyncio.to_thread(_sync_gemini, settings.corr_cheap_model, prompt)
decision = _parse_response(raw, ctx)
_id = (decision["matched_incident"] or {}).get("incident_id", "null")
logger.info(
f"LLM correlator ({settings.corr_cheap_model}): call {call_id}"
f"action={decision['action']} incident={_id} "
f"reasoning={decision['reasoning']!r}"
)
return decision
except Exception as e:
logger.warning(f"LLM correlator failed for call {call_id}: {e}")
return None
async def tiebreak(rules_decision: dict, llm_decision: dict, ctx: dict) -> dict:
"""
Run the smart tiebreaker (corr_smart_model) when rules and LLM disagree.
Falls back to rules_decision on any error.
"""
call_id = ctx["call_id"]
try:
prompt = _build_tiebreak_prompt(rules_decision, llm_decision, ctx)
raw = await asyncio.to_thread(_sync_gemini, settings.corr_smart_model, prompt)
decision = _parse_response(raw, ctx)
_id = (decision["matched_incident"] or {}).get("incident_id", "null")
logger.info(
f"LLM tiebreak ({settings.corr_smart_model}): call {call_id}"
f"action={decision['action']} incident={_id} "
f"reasoning={decision['reasoning']!r}"
)
return decision
except Exception as e:
logger.warning(f"LLM tiebreak failed for call {call_id}: {e} — using rules decision")
return rules_decision
def decisions_agree(rules: dict, llm: dict) -> bool:
"""True if both decisions agree on action and (when action=="link") on the target incident."""
if rules["action"] != llm["action"]:
return False
if rules["action"] == "link":
r_id = (rules.get("matched_incident") or {}).get("incident_id")
l_id = (llm.get("matched_incident") or {}).get("incident_id")
return r_id == l_id
return True
+66 -3
View File
@@ -83,6 +83,65 @@ def _public_url_to_gcs_uri(url: str) -> Optional[str]:
return None
async def _correlate_with_consensus(
call_id: str,
node_id: str,
system_id: Optional[str],
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
tags: list[str],
incident_type: Optional[str],
location: Optional[str],
location_coords: Optional[dict],
units: Optional[list] = None,
vehicles: Optional[list] = None,
cleared_units: Optional[list] = None,
reassignment: bool = False,
) -> Optional[str]:
"""
Consensus correlator: runs the rules engine and the cheap LLM in sequence.
If they agree the rules decision is committed directly.
If they disagree a smarter tiebreaker LLM makes the final call.
Falls back to rules-only when GEMINI_API_KEY is absent, the call is
content-free (thin), or any LLM call fails.
"""
from app.internal import incident_correlator, llm_correlator
preview = await incident_correlator.preview_correlation(
call_id=call_id, node_id=node_id, system_id=system_id,
talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name,
tags=tags, incident_type=incident_type, location=location,
location_coords=location_coords, units=units, vehicles=vehicles,
cleared_units=cleared_units, reassignment=reassignment,
)
ctx = preview["ctx"]
rules_decision = preview["decision"]
llm_decision = await llm_correlator.decide(call_id, ctx)
if llm_decision is None:
# LLM unavailable, skipped (thin call), or errored — rules wins.
rules_decision["corr_debug"]["corr_consensus"] = "rules_only"
return await incident_correlator.apply_correlation(preview)
if llm_correlator.decisions_agree(rules_decision, llm_decision):
rules_decision["corr_debug"]["corr_consensus"] = "agreed"
rules_decision["corr_debug"]["corr_llm_reasoning"] = llm_decision.get("reasoning", "")
return await incident_correlator.apply_correlation(preview)
# Disagree — escalate to the smarter tiebreaker.
logger.info(
f"Consensus disagreement for call {call_id}: "
f"rules={rules_decision['action']} vs llm={llm_decision['action']} — tiebreak"
)
final = await llm_correlator.tiebreak(rules_decision, llm_decision, ctx)
final["corr_debug"]["corr_consensus"] = "tiebreak"
final["corr_debug"]["corr_rules_action"] = rules_decision["action"]
final["corr_debug"]["corr_llm_action"] = llm_decision["action"]
return await incident_correlator.apply_correlation({"decision": final, "ctx": ctx})
async def _run_extraction_pipeline(
call_id: str,
node_id: str,
@@ -114,7 +173,7 @@ async def _run_extraction_pipeline(
# overlap so the new scene doesn't chain into the unit's previous incident.
is_reassignment = bool(scene.get("reassignment"))
corr_units = [] if is_reassignment else scene.get("units")
incident_id = await incident_correlator.correlate_call(
incident_id = await _correlate_with_consensus(
call_id=call_id,
node_id=node_id,
system_id=system_id,
@@ -217,7 +276,7 @@ async def _run_intelligence_pipeline(
all_tags.extend(scene["tags"])
is_reassignment = bool(scene.get("reassignment"))
corr_units = [] if is_reassignment else scene.get("units")
incident_id = await incident_correlator.correlate_call(
incident_id = await _correlate_with_consensus(
call_id=call_id,
node_id=node_id,
system_id=system_id,
@@ -241,8 +300,12 @@ async def _run_intelligence_pipeline(
# Correlator also runs for calls with no scenes (unclassified) to attempt
# talkgroup-based linking even when no transcript could be produced.
# Skip when extraction flagged the call — garbage or too-short transcripts
# carry no signal and would only attach spuriously via the thin path.
if not scenes:
incident_id = await incident_correlator.correlate_call(
_call_doc = await fstore.doc_get("calls", call_id)
if not (_call_doc or {}).get("skip_reason"):
incident_id = await _correlate_with_consensus(
call_id=call_id,
node_id=node_id,
system_id=system_id,