Files
server-26/drb-c2-core/app/internal/intelligence.py
T
2026-04-21 01:51:23 -04:00

288 lines
12 KiB
Python

"""
GPT-4o-mini intelligence extraction from call transcripts.
Sends the transcript to GPT-4o mini with a tight JSON schema prompt.
Returns structured data: incident type, tags, location, vehicles, units, severity.
Falls back gracefully if the API is unavailable or returns malformed output.
"""
import asyncio
import json
import re
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
_PROMPT_TEMPLATE = """You are analyzing a P25 public safety radio recording. The audio was transcribed by Whisper through a digital radio vocoder, which introduces errors. Each numbered transmission is a separate PTT press from a different radio. Extract structured information and respond ONLY with a single valid JSON object — no markdown, no explanation.
Schema:
{{
"incident_type": one of "fire" | "ems" | "police" | "accident" | "other" | "unknown",
"tags": [list of specific descriptive tags, max 6, e.g. "two-car mva", "property-damage-only", "working fire", "shots-fired"],
"location": "most specific location string found, or empty string",
"vehicles": [vehicle descriptions mentioned, e.g. "Hyundai Tucson", "black sedan"],
"units": [unit IDs or officer numbers mentioned, e.g. "Unit 511", "Car 4"],
"severity": one of "minor" | "moderate" | "major" | "unknown",
"resolved": true if this call explicitly signals the incident is over ("Code 4", "in custody", "all clear", "fire out", "patient transported", "GOA", "scene clear", "10-42", "negative contact", "clear the scene"), false otherwise,
"transcript_corrected": "corrected full transcript string, or null if no corrections needed"
}}
Rules:
- location: prefer intersections > addresses > mile markers > route+town > route alone > town alone. Empty string if none.
- tags: be specific and lowercase, hyphenated. Do not repeat incident_type as a tag.
- units: only identifiers explicitly mentioned, not inferred.
- Do not invent details not present in the transcript.
- transcript_corrected: fix only clear STT errors caused by vocoder distortion (e.g. "Several""10-4", misheard street names, garbled unit IDs). Use the back-and-forth context between transmissions to resolve ambiguities. Keep all radio language as-is — do NOT decode codes into plain English. Return null if the transcript looks accurate.
System: {system_id}
Talkgroup: {talkgroup_name}
{transcript_block}"""
# Nominatim viewbox half-width in degrees (~11 km at mid-latitudes)
_GEO_DELTA = 0.1
# node_id → state abbreviation/name from one-time reverse geocode
_node_state_cache: dict[str, str] = {}
# Strip P25 service suffixes to extract the municipality name from a talkgroup
_TG_SUFFIX_RE = re.compile(
r"\s*\b(police\s*dep(t|artment)?|pd|fire\s*(dep(t|artment)|district)?|"
r"ems|rescue|dispatch|fd|tac(tical)?|ops|operations?|command|"
r"(fire\s*)?ground|mutual\s*aid|channel|ch\b|car[-\s]to[-\s]car|"
r"division|unit)\b.*",
re.IGNORECASE,
)
async def extract_tags(
call_id: str,
transcript: str,
talkgroup_name: Optional[str] = None,
talkgroup_id: Optional[int] = None,
system_id: Optional[str] = None,
segments: Optional[list[dict]] = None,
node_id: Optional[str] = None,
preserve_transcript_correction: bool = False,
) -> tuple[list[str], Optional[str], Optional[str], Optional[dict], bool]:
"""
Extract incident tags, type, location, corrected transcript, and closure signal via GPT-4o mini.
Geocodes the extracted location string via Nominatim using the node's position as bias.
Returns:
(tags, primary_type, location_str, location_coords, resolved)
where location_coords is {"lat": float, "lng": float} or None,
and resolved is True when the transcript signals incident closure.
Side-effect: updates calls/{call_id} in Firestore with tags, location,
location_coords, vehicles, units, severity, transcript_corrected; also stores embedding.
"""
result = await asyncio.to_thread(
_sync_extract, transcript, talkgroup_name, talkgroup_id, system_id, segments
)
tags: list[str] = result.get("tags") or []
incident_type: Optional[str] = result.get("incident_type") or None
location: Optional[str] = result.get("location") or None
vehicles: list[str] = result.get("vehicles") or []
units: list[str] = result.get("units") or []
severity: str = result.get("severity") or "unknown"
resolved: bool = bool(result.get("resolved", False))
transcript_corrected: Optional[str] = result.get("transcript_corrected") or None
if incident_type in ("unknown", "other", ""):
incident_type = None
# Geocode the location string if we have one and a node to bias toward
location_coords: Optional[dict] = None
if location and node_id:
node_doc = await fstore.doc_get("nodes", node_id)
if node_doc:
node_lat = node_doc.get("lat")
node_lon = node_doc.get("lon")
if node_lat is not None and node_lon is not None:
state = await _get_node_state(node_id, node_lat, node_lon)
muni = _municipality_from_tg(talkgroup_name)
hint_parts = [p for p in [muni, state] if p]
query = f"{location}, {', '.join(hint_parts)}" if hint_parts else location
location_coords = await _geocode_location(query, node_lat, node_lon)
# Store embedding alongside structured data
embedding = await asyncio.to_thread(_sync_embed, _embed_text(transcript, incident_type))
updates: dict = {"tags": tags, "severity": severity}
if location:
updates["location"] = location
if location_coords:
updates["location_coords"] = location_coords
if vehicles:
updates["vehicles"] = vehicles
if units:
updates["units"] = units
if embedding:
updates["embedding"] = embedding
if transcript_corrected and not preserve_transcript_correction:
updates["transcript_corrected"] = transcript_corrected
try:
await fstore.doc_set("calls", call_id, updates)
except Exception as e:
logger.warning(f"Could not save intelligence for call {call_id}: {e}")
logger.info(
f"Intelligence: call {call_id} → type={incident_type}, "
f"tags={tags}, location={location!r}, coords={location_coords}, severity={severity}, "
f"corrected={transcript_corrected is not None}"
)
return tags, incident_type, location, location_coords, resolved
async def _geocode_location(
location_str: str, node_lat: float, node_lon: float
) -> Optional[dict]:
"""
Geocode a location string using Nominatim, biased toward the node's area.
Returns {"lat": float, "lng": float} or None if geocoding fails.
"""
import httpx
viewbox = (
f"{node_lon - _GEO_DELTA},{node_lat - _GEO_DELTA},"
f"{node_lon + _GEO_DELTA},{node_lat + _GEO_DELTA}"
)
params = {
"q": location_str,
"format": "json",
"limit": 1,
"viewbox": viewbox,
"bounded": 1,
}
headers = {"User-Agent": "DRB-Dispatch/1.0 (public-safety radio monitor)"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(
"https://nominatim.openstreetmap.org/search",
params=params,
headers=headers,
)
r.raise_for_status()
results = r.json()
if results:
coords = {"lat": float(results[0]["lat"]), "lng": float(results[0]["lon"])}
logger.info(f"Geocoded '{location_str}'{coords}")
return coords
except Exception as e:
logger.warning(f"Geocoding failed for '{location_str}': {e}")
return None
async def _get_node_state(node_id: str, lat: float, lon: float) -> Optional[str]:
"""
Reverse geocode the node's position once to extract its state.
Result is cached for the process lifetime — nodes don't move.
"""
if node_id in _node_state_cache:
return _node_state_cache[node_id]
import httpx
headers = {"User-Agent": "DRB-Dispatch/1.0 (public-safety radio monitor)"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(
"https://nominatim.openstreetmap.org/reverse",
params={"lat": lat, "lon": lon, "format": "json", "zoom": 5},
headers=headers,
)
r.raise_for_status()
data = r.json()
state = data.get("address", {}).get("state", "")
if state:
_node_state_cache[node_id] = state
logger.info(f"Node {node_id} reverse-geocoded to state: {state!r}")
return state
except Exception as e:
logger.warning(f"Node state reverse geocode failed: {e}")
return None
def _municipality_from_tg(tg_name: Optional[str]) -> Optional[str]:
"""
Extract the municipality name from a talkgroup name.
e.g. "Ossining PD""Ossining", "Westchester County Fire""Westchester County"
Returns None for tactical/operational channels with no useful location info.
"""
if not tg_name:
return None
cleaned = _TG_SUFFIX_RE.sub("", tg_name).strip()
# Discard if nothing left, purely numeric, or a short all-caps abbreviation (e.g. "WC", "TAC")
if not cleaned or cleaned.isdigit() or (len(cleaned) <= 3 and cleaned.isupper()):
return None
return cleaned
def _build_transcript_block(transcript: str, segments: Optional[list[dict]]) -> str:
"""Format transcript as numbered transmissions if segments are available."""
if segments and len(segments) > 1:
lines = [f"{i+1}. [{s['start']}s] {s['text']}" for i, s in enumerate(segments)]
return f"Transmissions ({len(segments)}):\n" + "\n".join(lines)
return f"Transcript:\n{transcript}"
def _sync_extract(
transcript: str,
talkgroup_name: Optional[str],
talkgroup_id: Optional[int],
system_id: Optional[str],
segments: Optional[list[dict]],
) -> dict:
"""Call GPT-4o mini and parse the JSON response."""
from app.config import settings
from openai import OpenAI
if not settings.openai_api_key:
logger.warning("OPENAI_API_KEY not set — intelligence extraction disabled.")
return {}
tg = f"{talkgroup_name} (TGID {talkgroup_id})" if talkgroup_id else (talkgroup_name or "unknown")
prompt = _PROMPT_TEMPLATE.format(
transcript_block=_build_transcript_block(transcript, segments),
talkgroup_name=tg,
system_id=system_id or "unknown",
)
try:
client = OpenAI(api_key=settings.openai_api_key)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)
except json.JSONDecodeError as e:
logger.warning(f"GPT-4o mini returned non-JSON: {e}")
return {}
except Exception as e:
logger.warning(f"GPT-4o mini extraction failed: {e}")
return {}
def _sync_embed(text: str) -> Optional[list[float]]:
"""Generate a text-embedding-3-small vector for semantic similarity."""
from app.config import settings
from openai import OpenAI
if not settings.openai_api_key:
return None
try:
client = OpenAI(api_key=settings.openai_api_key)
result = client.embeddings.create(model="text-embedding-3-small", input=text)
return result.data[0].embedding
except Exception as e:
logger.warning(f"Embedding generation failed: {e}")
return None
def _embed_text(transcript: str, incident_type: Optional[str]) -> str:
prefix = f"[{incident_type}] " if incident_type else ""
return f"{prefix}{transcript}"