Files
server-26/drb-c2-core/app/internal/intelligence.py
T
Logan 3d51db80d0 Improve extraction accuracy with speaker role inference
Add a SPEAKER ROLES section to the GPT-4o-mini prompt teaching it to
distinguish dispatch voice (names a unit then gives assignment + address)
from unit voice (opens with own callsign + brief status). Applied to
location attribution (dispatch-provided address beats unit position report)
and unit extraction (dispatched units vs. acknowledging units). No extra
API calls — purely prompt-level reasoning on the existing transcript.
2026-06-01 01:17:49 -04:00

535 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
GPT-4o-mini intelligence extraction from call transcripts.
Sends the transcript to GPT-4o-mini with a structured prompt that detects
whether the recording contains one or multiple distinct scenes (back-to-back
dispatch conversations on a busy channel). Returns a list of scene dicts —
one per detected incident. Most calls produce a single scene.
Falls back gracefully if the API is unavailable or returns malformed output.
"""
import asyncio
import json
import math
import re
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
_PROMPT_TEMPLATE = """You are analyzing a P25 public safety radio recording. The audio was transcribed by Whisper through a digital radio vocoder, which introduces errors. Each numbered transmission is a separate PTT press from a different radio.
SCENE DETECTION:
A busy dispatch channel sometimes captures back-to-back conversations about multiple concurrent incidents in a single recording. Detect whether this recording contains ONE scene (all transmissions relate to a single event) or MULTIPLE scenes (clearly distinct dispatch conversations with different units being assigned, different locations, different event types). Assign short status transmissions (10-4, en route, acknowledgements) with no clear scene context to the most recent scene before them in the list.
Always respond with the scenes array, even for a single scene.
SPEAKER ROLES:
P25 radio follows a predictable call-and-response pattern. Use it to correctly attribute entities — you do not have explicit speaker labels, but you can infer roles from conversational structure:
- Dispatch voice: opens by naming a unit then giving an assignment ("Unit 7, respond to 123 Main..."), provides incident addresses, says "be advised" / "stand by", reads back unit status. Dispatch speaks TO units.
- Unit voice: opens with the unit's own callsign or a brief status ("Unit 7 en route", "Baker-1 on scene", "Unit 7, 10-97"), acknowledges with "copy" / "10-4", requests info about their assignment. Units speak TO dispatch.
Apply speaker inference to extraction:
- A callsign at the start of a dispatch assignment ("Unit 7, go to...") — that unit is being dispatched. Include it in units.
- A callsign that opens a short acknowledgment ("Unit 7 en route", "Baker-1 copies") — that is the speaker's own ID. Include it in units.
- A location stated in a dispatch assignment is the incident address. Use it as location.
- A location stated by a unit ("I'm at Route 202 and Main") is their current position — use it as location only when no dispatch-provided address is present in the scene.
Response format — a JSON object with a "scenes" array. Each scene:
segment_indices: list of 0-based indices into the numbered transmissions (or null if no segments)
incident_type: one of "fire" | "ems" | "police" | "accident" | "other" | "unknown"
tags: list of specific descriptive tags, max 6, e.g. "two-car mva", "working fire", "shots-fired"
location: most specific location string found, or empty string
vehicles: list of vehicle descriptions mentioned
units: list of unit IDs or officer numbers explicitly mentioned
cleared_units: list of unit IDs that explicitly signal back-in-service or available in this recording
severity: one of "minor" | "moderate" | "major" | "unknown"
resolved: true if this scene explicitly signals incident closure, false otherwise
reassignment: true if a unit is breaking from their current scene to respond to a completely different call — whether dispatch-initiated ("Baker, can you clear and respond to...", "Adam, break from that and go to...") OR unit-initiated ("Show me headed to the vehicle complaint", "Can you show me to that call", a unit going 10-8 and self-requesting a new assignment). False if the unit is reporting in on their current scene, giving a status update, or requesting information about their existing call.
transcript_corrected: corrected text for this scene's transmissions only, or null
Rules:
- location: prefer intersections > addresses > mile markers > route+town > route alone > town alone. Dispatch-provided addresses take priority over unit-reported positions. Empty string if none.
- tags: describe WHAT happened, not WHERE. Specific, lowercase, hyphenated. Do not use location names, road names, talkgroup names, or place names as tags (wrong: "lower-macy's", "canvas-route-6", "route-202"; right: "suspect-search", "shoplifting", "vehicle-pursuit"). Do not repeat incident_type as a tag.
- units: ONLY identifiers that appear verbatim in the transcript. Use speaker role inference to distinguish units being dispatched from units acknowledging — both should be included. Never infer or guess unit IDs not present in the text.
- Do not invent details not present in the transcript.
- incident_type: let the talkgroup channel be your primary signal. Use "fire" ONLY if the talkgroup is clearly a fire/rescue channel OR the transcript explicitly describes active fire, smoke, flames, or structure fire activation. Police or EMS referencing a fire scene → use "police" or "ems". When uncertain, prefer "other" over "fire".
- ten_codes: interpret radio codes using the department reference provided below. Do not guess codes not listed.
- resolved: true only when the scene explicitly signals "Code 4", "all clear", "10-42", "in custody", "patient transported", "fire out", "GOA", "negative contact", "scene clear".
- cleared_units: only include units that explicitly stated their own back-in-service status in this recording (e.g. "Unit 7, 10-8", "Baker-1 available", "E-14 back in service", or the department ten-code for available/back-in-service listed above). Silence or absence of a unit is NOT clearance. A scene-wide Code 4 belongs in resolved=true, not here — cleared_units is for individual unit availability signals only.
- reassignment: only true when a unit is explicitly being pulled to a completely new call or location. A unit going en route to their first dispatch is NOT a reassignment. Routine status updates, acknowledgements, and scene updates are NOT reassignments.
- transcript_corrected: fix only clear STT/vocoder errors (e.g. "Several""10-4", misheard street names, garbled unit IDs). Keep all radio language as-is — do NOT decode codes into plain English. Return null if accurate.
System: {system_id}
Talkgroup: {talkgroup_name}
{ten_codes_block}{vocabulary_block}{transcript_block}"""
# Geographic bias radius for geocoding — half-width in degrees (~55 km)
_GEO_DELTA = 0.5
# Cache node state (e.g. "New York") and county (e.g. "Westchester County") per node
_node_state_cache: dict[str, str] = {}
_node_county_cache: dict[str, str] = {}
# Police/law-enforcement phonetic alphabet words (APCO + NATO).
# A run of 5+ of these in a transcript is a strong Whisper hallucination signal.
_PHONETIC_ALPHA_WORDS = frozenset({
# APCO (law enforcement)
"adam", "baker", "charles", "david", "edward", "frank", "george", "henry",
"ida", "john", "king", "lincoln", "mary", "nora", "ocean", "paul", "queen",
"robert", "sam", "tom", "union", "victor", "william", "x-ray", "young", "zebra",
# NATO
"alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel",
"india", "juliet", "kilo", "lima", "mike", "november", "oscar", "papa",
"quebec", "romeo", "sierra", "tango", "uniform", "whiskey", "yankee", "zulu",
})
# Strip P25 service suffixes to extract the municipality name from a talkgroup
_TG_SUFFIX_RE = re.compile(
r"\s*\b(police\s*dep(t|artment)?|pd|fire\s*(dep(t|artment)|district)?|"
r"ems|rescue|dispatch|fd|tac(tical)?|ops|operations?|command|"
r"(fire\s*)?ground|mutual\s*aid|channel|ch\b|car[-\s]to[-\s]car|"
r"division|unit)\b.*",
re.IGNORECASE,
)
def _is_garbage_transcript(transcript: str) -> bool:
"""
Detect Whisper hallucinations that should be discarded before GPT processing.
Two signals:
1. Phonetic-alphabet run ≥ 5 consecutive words: Whisper hallucinated a
training-data sequence (common on silent or noise-only audio).
2. High comma density (> 15% of tokens) in long transcripts: list-dump
hallucinations contain far more commas than real radio speech.
"""
words = re.findall(r"[\w\-]+", transcript.lower())
if not words:
return False
# Threshold of 12: well above any legitimate plate/name spellout (~68 words)
# but catches the full-alphabet hallucination (26 words in sequence).
run = 0
for w in words:
if w in _PHONETIC_ALPHA_WORDS:
run += 1
if run >= 12:
return True
else:
run = 0
if len(words) > 30 and transcript.count(",") / len(words) > 0.15:
return True
return False
def _build_ten_codes_block(ten_codes: dict[str, str]) -> str:
if not ten_codes:
return ""
lines = "\n".join(f" {code}: {meaning}" for code, meaning in sorted(ten_codes.items()))
return f"Department ten-codes:\n{lines}\n\n"
async def extract_scenes(
call_id: str,
transcript: str,
talkgroup_name: Optional[str] = None,
talkgroup_id: Optional[int] = None,
system_id: Optional[str] = None,
segments: Optional[list[dict]] = None,
node_id: Optional[str] = None,
preserve_transcript_correction: bool = False,
) -> list[dict]:
"""
Split the transcript into one or more scenes and extract structured
intelligence for each. Most calls return a single scene; a busy dispatch
channel capturing back-to-back conversations returns multiple.
Each scene dict contains:
tags, incident_type, location, location_coords, resolved,
severity, vehicles, units, transcript_corrected,
segment_indices, embedding
Side-effect: updates calls/{call_id} in Firestore with merged tags,
location (primary scene), units/vehicles, severity, embedding, and
optionally transcript_corrected.
"""
vocabulary: list[str] = []
ten_codes: dict[str, str] = {}
if system_id:
# Single cached read — vocabulary and ten_codes live on the same document.
system_doc = await fstore.doc_get_cached("systems", system_id)
if system_doc:
vocabulary = system_doc.get("vocabulary") or []
ten_codes = system_doc.get("ten_codes") or {}
if _is_garbage_transcript(transcript):
logger.warning(
f"Intelligence: call {call_id} — garbage transcript detected "
f"(Whisper hallucination), skipping extraction"
)
try:
await fstore.doc_set("calls", call_id, {"skip_reason": "garbage_transcript"})
except Exception:
pass
return []
# Transcripts with ≤5 words carry no extractable intelligence — GPT hallucinates
# units and tags from thin context (e.g. "Main Lot", "10-4", "David").
if len(transcript.split()) <= 5:
logger.info(
f"Intelligence: call {call_id} — transcript too short for extraction "
f"({len(transcript.split())} words), skipping"
)
try:
await fstore.doc_set("calls", call_id, {"skip_reason": "transcript_too_short"})
except Exception:
pass
return []
raw_scenes: list[dict] = await asyncio.to_thread(
_sync_extract,
transcript, talkgroup_name, talkgroup_id, system_id, segments, vocabulary, ten_codes,
)
if not raw_scenes:
return []
# Resolve node position once for geocoding all scenes
node_lat: Optional[float] = None
node_lon: Optional[float] = None
if node_id:
node_doc = await fstore.doc_get_cached("nodes", node_id)
if node_doc:
node_lat = node_doc.get("lat")
node_lon = node_doc.get("lon")
processed: list[dict] = []
for scene in raw_scenes:
tags: list[str] = scene.get("tags") or []
incident_type: Optional[str] = scene.get("incident_type") or None
location: Optional[str] = scene.get("location") or None
vehicles: list[str] = scene.get("vehicles") or []
units: list[str] = scene.get("units") or []
cleared_units: list[str] = scene.get("cleared_units") or []
severity: str = scene.get("severity") or "unknown"
resolved: bool = bool(scene.get("resolved", False))
reassignment: bool = bool(scene.get("reassignment", False))
transcript_corrected: Optional[str]= scene.get("transcript_corrected") or None
segment_indices: Optional[list] = scene.get("segment_indices")
if incident_type in ("unknown", "other", ""):
incident_type = None
# Geocode this scene's location.
# Build the most specific query possible: location + municipality + state.
# e.g. "High Street" → "High Street, Yorktown, New York"
# This prevents generic street names from resolving to wrong-country results.
location_coords: Optional[dict] = None
if location and node_lat is not None and node_lon is not None:
muni = _municipality_from_tg(talkgroup_name)
state = await _get_node_state(node_id or "", node_lat, node_lon) if node_id else ""
county = _node_county_cache.get(node_id or "") if node_id else ""
parts = [location]
if muni:
parts.append(muni)
if county:
parts.append(county)
if state:
parts.append(state)
query = ", ".join(parts)
location_coords = await _geocode_location(query, node_lat, node_lon)
# Embed this scene's content
scene_text = _build_scene_embed_text(
transcript, segments, segment_indices, incident_type, transcript_corrected
)
embedding = await asyncio.to_thread(_sync_embed, scene_text)
processed.append({
"tags": tags,
"incident_type": incident_type,
"location": location,
"location_coords": location_coords,
"vehicles": vehicles,
"units": units,
"cleared_units": cleared_units,
"severity": severity,
"resolved": resolved,
"reassignment": reassignment,
"transcript_corrected": transcript_corrected,
"segment_indices": segment_indices,
"embedding": embedding,
})
# Merge across scenes for the call-level Firestore document.
# Primary scene (first) owns location, severity, transcript_corrected.
# Tags/units/vehicles are union-merged from all scenes.
primary = processed[0]
all_tags = list(dict.fromkeys(t for s in processed for t in s["tags"]))
all_units = list(dict.fromkeys(u for s in processed for u in s["units"]))
all_vehicles = list(dict.fromkeys(v for s in processed for v in s["vehicles"]))
all_cleared = list(dict.fromkeys(u for s in processed for u in s["cleared_units"]))
updates: dict = {"tags": all_tags, "severity": primary["severity"]}
if primary["location"]:
updates["location"] = primary["location"]
if primary["location_coords"]:
updates["location_coords"] = primary["location_coords"]
if all_units:
updates["units"] = all_units
if all_cleared:
updates["cleared_units"] = all_cleared
if all_vehicles:
updates["vehicles"] = all_vehicles
if primary["embedding"]:
updates["embedding"] = primary["embedding"]
if primary["transcript_corrected"] and not preserve_transcript_correction:
updates["transcript_corrected"] = primary["transcript_corrected"]
try:
await fstore.doc_set("calls", call_id, updates)
except Exception as e:
logger.warning(f"Could not save intelligence for call {call_id}: {e}")
scene_summary = (
f"{len(processed)} scene(s): "
+ ", ".join(
f"[{s['incident_type'] or 'unclassified'} tags={s['tags'][:2]}]"
for s in processed
)
)
logger.info(f"Intelligence: call {call_id}{scene_summary}")
return processed
def _geo_dist_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Haversine distance in km between two lat/lon points."""
R = 6371.0
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2
return R * 2 * math.asin(math.sqrt(a))
async def _get_node_state(node_id: str, lat: float, lon: float) -> str:
"""
Return the US state name (e.g. "New York") for a node's position.
Also populates _node_county_cache as a side-effect (same API call).
Uses Google Maps Reverse Geocoding; cached for the process lifetime since nodes don't move.
"""
if node_id in _node_state_cache:
return _node_state_cache[node_id]
import httpx
from app.config import settings
if not settings.google_maps_api_key:
return ""
state = ""
county = ""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(
"https://maps.googleapis.com/maps/api/geocode/json",
params={
"latlng": f"{lat},{lon}",
"result_type": "administrative_area_level_1|administrative_area_level_2",
"key": settings.google_maps_api_key,
},
)
r.raise_for_status()
data = r.json()
if data.get("status") == "OK" and data.get("results"):
for result in data["results"]:
for comp in result.get("address_components", []):
types = comp.get("types", [])
if "administrative_area_level_1" in types and not state:
state = comp.get("long_name", "")
if "administrative_area_level_2" in types and not county:
county = comp.get("long_name", "")
except Exception as e:
logger.warning(f"Node state lookup failed for {node_id}: {e}")
if state:
_node_state_cache[node_id] = state
if county:
_node_county_cache[node_id] = county
if state or county:
logger.info(f"Node {node_id} geo resolved: county={county!r} state={state!r}")
return state
async def _geocode_location(
location_str: str, node_lat: float, node_lon: float
) -> Optional[dict]:
"""
Geocode using Google Maps Geocoding API, biased toward the node's area.
Returns {"lat": float, "lng": float} or None if geocoding fails or the
result is farther than geocode_max_km from the node (wrong-jurisdiction guard).
"""
import httpx
from app.config import settings
if not settings.google_maps_api_key:
logger.warning("GOOGLE_MAPS_API_KEY not set — geocoding disabled")
return None
bounds = (
f"{node_lat - _GEO_DELTA},{node_lon - _GEO_DELTA}"
f"|{node_lat + _GEO_DELTA},{node_lon + _GEO_DELTA}"
)
params = {
"address": location_str,
"bounds": bounds,
"region": "us",
"key": settings.google_maps_api_key,
}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(
"https://maps.googleapis.com/maps/api/geocode/json",
params=params,
)
r.raise_for_status()
data = r.json()
if data.get("status") != "OK" or not data.get("results"):
return None
result = data["results"][0]
location_type = result.get("geometry", {}).get("location_type", "")
# Only accept address-level precision. GEOMETRIC_CENTER (city/neighborhood
# centroid) and APPROXIMATE (region boundary) produce coordinates that look
# valid but are too vague for 0.5km proximity matching — they often resolve
# to the same point as the node's position and create false proximity matches.
if location_type not in ("ROOFTOP", "RANGE_INTERPOLATED"):
logger.info(
f"Geocoding rejected '{location_str}' — imprecise result "
f"(location_type={location_type!r}), returning None"
)
return None
loc = result["geometry"]["location"]
lat, lng = float(loc["lat"]), float(loc["lng"])
dist_km = _geo_dist_km(node_lat, node_lon, lat, lng)
if dist_km > settings.geocode_max_km:
logger.warning(
f"Geocoding rejected '{location_str}' → ({lat:.4f}, {lng:.4f}) "
f"{dist_km:.1f}km from node exceeds geocode_max_km={settings.geocode_max_km}"
)
return None
coords = {"lat": lat, "lng": lng}
logger.info(f"Geocoded '{location_str}'{coords} ({dist_km:.1f}km from node) [{location_type}]")
return coords
except Exception as e:
logger.warning(f"Geocoding failed for '{location_str}': {e}")
return None
def _municipality_from_tg(tg_name: Optional[str]) -> Optional[str]:
"""
Extract the municipality name from a talkgroup name.
e.g. "Ossining PD""Ossining", "Westchester County Fire""Westchester County"
Returns None for tactical/operational channels with no useful location info.
"""
if not tg_name:
return None
cleaned = _TG_SUFFIX_RE.sub("", tg_name).strip()
if not cleaned or cleaned.isdigit() or (len(cleaned) <= 3 and cleaned.isupper()):
return None
return cleaned
def _build_transcript_block(transcript: str, segments: Optional[list[dict]]) -> str:
"""Format transcript as numbered transmissions if segments are available."""
if segments and len(segments) > 1:
lines = [f"{i+1}. [{s['start']}s] {s['text']}" for i, s in enumerate(segments)]
return f"Transmissions ({len(segments)}):\n" + "\n".join(lines)
return f"Transcript:\n{transcript}"
def _build_scene_embed_text(
transcript: str,
segments: Optional[list[dict]],
segment_indices: Optional[list[int]],
incident_type: Optional[str],
transcript_corrected: Optional[str],
) -> str:
"""Build the text string to embed for a specific scene."""
prefix = f"[{incident_type}] " if incident_type else ""
if transcript_corrected:
return f"{prefix}{transcript_corrected}"
if segments and segment_indices:
texts = [segments[i]["text"] for i in segment_indices if i < len(segments)]
return f"{prefix}{' '.join(texts)}"
return f"{prefix}{transcript}"
def _sync_extract(
transcript: str,
talkgroup_name: Optional[str],
talkgroup_id: Optional[int],
system_id: Optional[str],
segments: Optional[list[dict]],
vocabulary: Optional[list[str]] = None,
ten_codes: Optional[dict[str, str]] = None,
) -> list[dict]:
"""Call GPT-4o-mini and return a list of scene dicts."""
from app.config import settings
from openai import OpenAI
if not settings.openai_api_key:
logger.warning("OPENAI_API_KEY not set — intelligence extraction disabled.")
return []
from app.internal.vocabulary_learner import build_gpt_vocab_block
tg = f"{talkgroup_name} (TGID {talkgroup_id})" if talkgroup_id else (talkgroup_name or "unknown")
prompt = _PROMPT_TEMPLATE.format(
transcript_block=_build_transcript_block(transcript, segments),
talkgroup_name=tg,
system_id=system_id or "unknown",
ten_codes_block=_build_ten_codes_block(ten_codes or {}),
vocabulary_block=build_gpt_vocab_block(vocabulary or []),
)
try:
client = OpenAI(api_key=settings.openai_api_key)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
raw = json.loads(response.choices[0].message.content)
# New format: {"scenes": [...]}
if "scenes" in raw and isinstance(raw["scenes"], list):
return raw["scenes"]
# Fallback: GPT returned the old flat single-scene format
logger.warning("GPT returned flat format instead of scenes array — wrapping")
return [raw]
except json.JSONDecodeError as e:
logger.warning(f"GPT-4o-mini returned non-JSON: {e}")
return []
except Exception as e:
logger.warning(f"GPT-4o-mini extraction failed: {e}")
return []
def _sync_embed(text: str) -> Optional[list[float]]:
"""Generate a text-embedding-3-small vector for semantic similarity."""
from app.config import settings
from openai import OpenAI
if not settings.openai_api_key:
return None
try:
client = OpenAI(api_key=settings.openai_api_key)
result = client.embeddings.create(model="text-embedding-3-small", input=text)
return result.data[0].embedding
except Exception as e:
logger.warning(f"Embedding generation failed: {e}")
return None