Files
Logan 8b660d8e10 feat: incident correlation overhaul, signal-based auto-resolve, token fixes
Correlator
- Raise fast-path idle gate 30 → 90 min (tg_fast_path_idle_minutes)
- Fix disambiguate always-commits bug: run _call_fits_incident on winner
  before committing; fall through to new-incident creation if it fails
- Add unit-continuity path (path 1.5): matches all_active by shared unit
  IDs with a reassignment guard, bridges calls past the idle gate
- Add tag-based incident_type inference (_TAG_TYPE_HINTS) as GPT fallback,
  rescuing tagged calls that would have been dropped (616 observed orphans)
- Add master/child incident model: _create_master_incident, _demote_to_child,
  _add_child_to_master; new incidents stamped incident_type="master"
- Add cross-system parent detection (_find_cross_system_parent): two-signal
  scoring (road overlap=0.4, embedding≥0.78=0.3, proximity=0.3, threshold=0.5)
  wired into create-if-new path; creates master shell on first cross-system match
- Add maybe_resolve_parent: auto-resolves master when all children close;
  called from upload pipeline (LLM closure) and summarizer stale sweep
- Add signal-based auto-resolve via units_active/units_cleared tracking:
  GPT now extracts cleared_units per scene; _update_incident moves units
  between active/cleared lists and resolves the incident when active empties;
  stored on call doc for re-correlation sweep reuse
- Add _create_incident initialization of units_active/units_cleared fields

Re-correlation sweep
- Add corr_sweep_count + MAX_SWEEP_ATTEMPTS=3: orphans get 3 attempts
  then are tombstoned as corr_path="unlinked", ending the re-sweep loop
  (previously hammering each orphan 29-31 times per shift)

Intelligence extraction
- Add cleared_units to GPT prompt schema and rules
- Extract and propagate cleared_units per scene; merge across scenes;
  store on call doc for re-correlation sweep

Token management
- Fix token release bug: remove release_token call on discord_connected=False
  in MQTT checkin (transient Discord drops were orphaning bots mid-shift)
- Add PUT /tokens/{id}/prefer/{system_id} endpoint: lock a bot token to a
  system; pass _none as system_id to clear; stored bidirectionally on both
  token and system documents
- discord_join handler resolves preferred_token_id from system doc and passes
  system_name in MQTT payload
2026-05-10 19:49:05 -04:00

390 lines
18 KiB
Python

"""
GPT-4o-mini intelligence extraction from call transcripts.
Sends the transcript to GPT-4o-mini with a structured prompt that detects
whether the recording contains one or multiple distinct scenes (back-to-back
dispatch conversations on a busy channel). Returns a list of scene dicts —
one per detected incident. Most calls produce a single scene.
Falls back gracefully if the API is unavailable or returns malformed output.
"""
import asyncio
import json
import re
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
_PROMPT_TEMPLATE = """You are analyzing a P25 public safety radio recording. The audio was transcribed by Whisper through a digital radio vocoder, which introduces errors. Each numbered transmission is a separate PTT press from a different radio.
SCENE DETECTION:
A busy dispatch channel sometimes captures back-to-back conversations about multiple concurrent incidents in a single recording. Detect whether this recording contains ONE scene (all transmissions relate to a single event) or MULTIPLE scenes (clearly distinct dispatch conversations with different units being assigned, different locations, different event types). Assign short status transmissions (10-4, en route, acknowledgements) with no clear scene context to the most recent scene before them in the list.
Always respond with the scenes array, even for a single scene.
Response format — a JSON object with a "scenes" array. Each scene:
segment_indices: list of 0-based indices into the numbered transmissions (or null if no segments)
incident_type: one of "fire" | "ems" | "police" | "accident" | "other" | "unknown"
tags: list of specific descriptive tags, max 6, e.g. "two-car mva", "working fire", "shots-fired"
location: most specific location string found, or empty string
vehicles: list of vehicle descriptions mentioned
units: list of unit IDs or officer numbers explicitly mentioned
cleared_units: list of unit IDs that explicitly signal back-in-service or available in this recording
severity: one of "minor" | "moderate" | "major" | "unknown"
resolved: true if this scene explicitly signals incident closure, false otherwise
reassignment: true if dispatch is actively pulling a unit away from their current assignment to respond to a new, different call — e.g. "Baker, can you clear and respond to...", "Adam, break from that and go to...". False if the unit is simply reporting in, updating status, or continuing their current assignment.
transcript_corrected: corrected text for this scene's transmissions only, or null
Rules:
- location: prefer intersections > addresses > mile markers > route+town > route alone > town alone. Empty string if none.
- tags: describe WHAT happened, not WHERE. Specific, lowercase, hyphenated. Do not use location names, road names, talkgroup names, or place names as tags (wrong: "lower-macy's", "canvas-route-6", "route-202"; right: "suspect-search", "shoplifting", "vehicle-pursuit"). Do not repeat incident_type as a tag.
- units: only identifiers explicitly mentioned, not inferred.
- Do not invent details not present in the transcript.
- incident_type: let the talkgroup channel be your primary signal. Use "fire" ONLY if the talkgroup is clearly a fire/rescue channel OR the transcript explicitly describes active fire, smoke, flames, or structure fire activation. Police or EMS referencing a fire scene → use "police" or "ems". When uncertain, prefer "other" over "fire".
- ten_codes: interpret radio codes using the department reference provided below. Do not guess codes not listed.
- resolved: true only when the scene explicitly signals "Code 4", "all clear", "10-42", "in custody", "patient transported", "fire out", "GOA", "negative contact", "scene clear".
- cleared_units: only include units that explicitly stated their own back-in-service status in this recording (e.g. "Unit 7, 10-8", "Baker-1 available", "E-14 back in service", or the department ten-code for available/back-in-service listed above). Silence or absence of a unit is NOT clearance. A scene-wide Code 4 belongs in resolved=true, not here — cleared_units is for individual unit availability signals only.
- reassignment: only true when a unit is explicitly being pulled to a completely new call or location. A unit going en route to their first dispatch is NOT a reassignment. Routine status updates, acknowledgements, and scene updates are NOT reassignments.
- transcript_corrected: fix only clear STT/vocoder errors (e.g. "Several""10-4", misheard street names, garbled unit IDs). Keep all radio language as-is — do NOT decode codes into plain English. Return null if accurate.
System: {system_id}
Talkgroup: {talkgroup_name}
{ten_codes_block}{vocabulary_block}{transcript_block}"""
# Nominatim viewbox half-width in degrees (~11 km at mid-latitudes)
_GEO_DELTA = 0.1
# node_id → state abbreviation/name from one-time reverse geocode
_node_state_cache: dict[str, str] = {}
# Strip P25 service suffixes to extract the municipality name from a talkgroup
_TG_SUFFIX_RE = re.compile(
r"\s*\b(police\s*dep(t|artment)?|pd|fire\s*(dep(t|artment)|district)?|"
r"ems|rescue|dispatch|fd|tac(tical)?|ops|operations?|command|"
r"(fire\s*)?ground|mutual\s*aid|channel|ch\b|car[-\s]to[-\s]car|"
r"division|unit)\b.*",
re.IGNORECASE,
)
def _build_ten_codes_block(ten_codes: dict[str, str]) -> str:
if not ten_codes:
return ""
lines = "\n".join(f" {code}: {meaning}" for code, meaning in sorted(ten_codes.items()))
return f"Department ten-codes:\n{lines}\n\n"
async def extract_scenes(
call_id: str,
transcript: str,
talkgroup_name: Optional[str] = None,
talkgroup_id: Optional[int] = None,
system_id: Optional[str] = None,
segments: Optional[list[dict]] = None,
node_id: Optional[str] = None,
preserve_transcript_correction: bool = False,
) -> list[dict]:
"""
Split the transcript into one or more scenes and extract structured
intelligence for each. Most calls return a single scene; a busy dispatch
channel capturing back-to-back conversations returns multiple.
Each scene dict contains:
tags, incident_type, location, location_coords, resolved,
severity, vehicles, units, transcript_corrected,
segment_indices, embedding
Side-effect: updates calls/{call_id} in Firestore with merged tags,
location (primary scene), units/vehicles, severity, embedding, and
optionally transcript_corrected.
"""
vocabulary: list[str] = []
ten_codes: dict[str, str] = {}
if system_id:
# Single cached read — vocabulary and ten_codes live on the same document.
system_doc = await fstore.doc_get_cached("systems", system_id)
if system_doc:
vocabulary = system_doc.get("vocabulary") or []
ten_codes = system_doc.get("ten_codes") or {}
raw_scenes: list[dict] = await asyncio.to_thread(
_sync_extract,
transcript, talkgroup_name, talkgroup_id, system_id, segments, vocabulary, ten_codes,
)
if not raw_scenes:
return []
# Resolve node position once for geocoding all scenes
node_lat: Optional[float] = None
node_lon: Optional[float] = None
if node_id:
node_doc = await fstore.doc_get_cached("nodes", node_id)
if node_doc:
node_lat = node_doc.get("lat")
node_lon = node_doc.get("lon")
processed: list[dict] = []
for scene in raw_scenes:
tags: list[str] = scene.get("tags") or []
incident_type: Optional[str] = scene.get("incident_type") or None
location: Optional[str] = scene.get("location") or None
vehicles: list[str] = scene.get("vehicles") or []
units: list[str] = scene.get("units") or []
cleared_units: list[str] = scene.get("cleared_units") or []
severity: str = scene.get("severity") or "unknown"
resolved: bool = bool(scene.get("resolved", False))
reassignment: bool = bool(scene.get("reassignment", False))
transcript_corrected: Optional[str]= scene.get("transcript_corrected") or None
segment_indices: Optional[list] = scene.get("segment_indices")
if incident_type in ("unknown", "other", ""):
incident_type = None
# Geocode this scene's location
location_coords: Optional[dict] = None
if location and node_lat is not None and node_lon is not None:
state = await _get_node_state(node_id, node_lat, node_lon)
muni = _municipality_from_tg(talkgroup_name)
hint_parts = [p for p in [muni, state] if p]
query = f"{location}, {', '.join(hint_parts)}" if hint_parts else location
location_coords = await _geocode_location(query, node_lat, node_lon)
# Embed this scene's content
scene_text = _build_scene_embed_text(
transcript, segments, segment_indices, incident_type, transcript_corrected
)
embedding = await asyncio.to_thread(_sync_embed, scene_text)
processed.append({
"tags": tags,
"incident_type": incident_type,
"location": location,
"location_coords": location_coords,
"vehicles": vehicles,
"units": units,
"cleared_units": cleared_units,
"severity": severity,
"resolved": resolved,
"reassignment": reassignment,
"transcript_corrected": transcript_corrected,
"segment_indices": segment_indices,
"embedding": embedding,
})
# Merge across scenes for the call-level Firestore document.
# Primary scene (first) owns location, severity, transcript_corrected.
# Tags/units/vehicles are union-merged from all scenes.
primary = processed[0]
all_tags = list(dict.fromkeys(t for s in processed for t in s["tags"]))
all_units = list(dict.fromkeys(u for s in processed for u in s["units"]))
all_vehicles = list(dict.fromkeys(v for s in processed for v in s["vehicles"]))
all_cleared = list(dict.fromkeys(u for s in processed for u in s["cleared_units"]))
updates: dict = {"tags": all_tags, "severity": primary["severity"]}
if primary["location"]:
updates["location"] = primary["location"]
if primary["location_coords"]:
updates["location_coords"] = primary["location_coords"]
if all_units:
updates["units"] = all_units
if all_cleared:
updates["cleared_units"] = all_cleared
if all_vehicles:
updates["vehicles"] = all_vehicles
if primary["embedding"]:
updates["embedding"] = primary["embedding"]
if primary["transcript_corrected"] and not preserve_transcript_correction:
updates["transcript_corrected"] = primary["transcript_corrected"]
try:
await fstore.doc_set("calls", call_id, updates)
except Exception as e:
logger.warning(f"Could not save intelligence for call {call_id}: {e}")
scene_summary = (
f"{len(processed)} scene(s): "
+ ", ".join(
f"[{s['incident_type'] or 'unclassified'} tags={s['tags'][:2]}]"
for s in processed
)
)
logger.info(f"Intelligence: call {call_id}{scene_summary}")
return processed
async def _geocode_location(
location_str: str, node_lat: float, node_lon: float
) -> Optional[dict]:
"""
Geocode a location string using Nominatim, biased toward the node's area.
Returns {"lat": float, "lng": float} or None if geocoding fails.
"""
import httpx
viewbox = (
f"{node_lon - _GEO_DELTA},{node_lat - _GEO_DELTA},"
f"{node_lon + _GEO_DELTA},{node_lat + _GEO_DELTA}"
)
params = {
"q": location_str,
"format": "json",
"limit": 1,
"viewbox": viewbox,
"bounded": 1,
}
headers = {"User-Agent": "DRB-Dispatch/1.0 (public-safety radio monitor)"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(
"https://nominatim.openstreetmap.org/search",
params=params,
headers=headers,
)
r.raise_for_status()
results = r.json()
if results:
coords = {"lat": float(results[0]["lat"]), "lng": float(results[0]["lon"])}
logger.info(f"Geocoded '{location_str}'{coords}")
return coords
except Exception as e:
logger.warning(f"Geocoding failed for '{location_str}': {e}")
return None
async def _get_node_state(node_id: str, lat: float, lon: float) -> Optional[str]:
"""
Reverse geocode the node's position once to extract its state.
Result is cached for the process lifetime — nodes don't move.
"""
if node_id in _node_state_cache:
return _node_state_cache[node_id]
import httpx
headers = {"User-Agent": "DRB-Dispatch/1.0 (public-safety radio monitor)"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(
"https://nominatim.openstreetmap.org/reverse",
params={"lat": lat, "lon": lon, "format": "json", "zoom": 5},
headers=headers,
)
r.raise_for_status()
data = r.json()
state = data.get("address", {}).get("state", "")
if state:
_node_state_cache[node_id] = state
logger.info(f"Node {node_id} reverse-geocoded to state: {state!r}")
return state
except Exception as e:
logger.warning(f"Node state reverse geocode failed: {e}")
return None
def _municipality_from_tg(tg_name: Optional[str]) -> Optional[str]:
"""
Extract the municipality name from a talkgroup name.
e.g. "Ossining PD""Ossining", "Westchester County Fire""Westchester County"
Returns None for tactical/operational channels with no useful location info.
"""
if not tg_name:
return None
cleaned = _TG_SUFFIX_RE.sub("", tg_name).strip()
if not cleaned or cleaned.isdigit() or (len(cleaned) <= 3 and cleaned.isupper()):
return None
return cleaned
def _build_transcript_block(transcript: str, segments: Optional[list[dict]]) -> str:
"""Format transcript as numbered transmissions if segments are available."""
if segments and len(segments) > 1:
lines = [f"{i+1}. [{s['start']}s] {s['text']}" for i, s in enumerate(segments)]
return f"Transmissions ({len(segments)}):\n" + "\n".join(lines)
return f"Transcript:\n{transcript}"
def _build_scene_embed_text(
transcript: str,
segments: Optional[list[dict]],
segment_indices: Optional[list[int]],
incident_type: Optional[str],
transcript_corrected: Optional[str],
) -> str:
"""Build the text string to embed for a specific scene."""
prefix = f"[{incident_type}] " if incident_type else ""
if transcript_corrected:
return f"{prefix}{transcript_corrected}"
if segments and segment_indices:
texts = [segments[i]["text"] for i in segment_indices if i < len(segments)]
return f"{prefix}{' '.join(texts)}"
return f"{prefix}{transcript}"
def _sync_extract(
transcript: str,
talkgroup_name: Optional[str],
talkgroup_id: Optional[int],
system_id: Optional[str],
segments: Optional[list[dict]],
vocabulary: Optional[list[str]] = None,
ten_codes: Optional[dict[str, str]] = None,
) -> list[dict]:
"""Call GPT-4o-mini and return a list of scene dicts."""
from app.config import settings
from openai import OpenAI
if not settings.openai_api_key:
logger.warning("OPENAI_API_KEY not set — intelligence extraction disabled.")
return []
from app.internal.vocabulary_learner import build_gpt_vocab_block
tg = f"{talkgroup_name} (TGID {talkgroup_id})" if talkgroup_id else (talkgroup_name or "unknown")
prompt = _PROMPT_TEMPLATE.format(
transcript_block=_build_transcript_block(transcript, segments),
talkgroup_name=tg,
system_id=system_id or "unknown",
ten_codes_block=_build_ten_codes_block(ten_codes or {}),
vocabulary_block=build_gpt_vocab_block(vocabulary or []),
)
try:
client = OpenAI(api_key=settings.openai_api_key)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
raw = json.loads(response.choices[0].message.content)
# New format: {"scenes": [...]}
if "scenes" in raw and isinstance(raw["scenes"], list):
return raw["scenes"]
# Fallback: GPT returned the old flat single-scene format
logger.warning("GPT returned flat format instead of scenes array — wrapping")
return [raw]
except json.JSONDecodeError as e:
logger.warning(f"GPT-4o-mini returned non-JSON: {e}")
return []
except Exception as e:
logger.warning(f"GPT-4o-mini extraction failed: {e}")
return []
def _sync_embed(text: str) -> Optional[list[float]]:
"""Generate a text-embedding-3-small vector for semantic similarity."""
from app.config import settings
from openai import OpenAI
if not settings.openai_api_key:
return None
try:
client = OpenAI(api_key=settings.openai_api_key)
result = client.embeddings.create(model="text-embedding-3-small", input=text)
return result.data[0].embedding
except Exception as e:
logger.warning(f"Embedding generation failed: {e}")
return None