""" GPT-4o-mini intelligence extraction from call transcripts. Sends the transcript to GPT-4o-mini with a structured prompt that detects whether the recording contains one or multiple distinct scenes (back-to-back dispatch conversations on a busy channel). Returns a list of scene dicts — one per detected incident. Most calls produce a single scene. Falls back gracefully if the API is unavailable or returns malformed output. """ import asyncio import json import re from typing import Optional from app.internal.logger import logger from app.internal import firestore as fstore _PROMPT_TEMPLATE = """You are analyzing a P25 public safety radio recording. The audio was transcribed by Whisper through a digital radio vocoder, which introduces errors. Each numbered transmission is a separate PTT press from a different radio. SCENE DETECTION: A busy dispatch channel sometimes captures back-to-back conversations about multiple concurrent incidents in a single recording. Detect whether this recording contains ONE scene (all transmissions relate to a single event) or MULTIPLE scenes (clearly distinct dispatch conversations with different units being assigned, different locations, different event types). Assign short status transmissions (10-4, en route, acknowledgements) with no clear scene context to the most recent scene before them in the list. Always respond with the scenes array, even for a single scene. Response format — a JSON object with a "scenes" array. Each scene: segment_indices: list of 0-based indices into the numbered transmissions (or null if no segments) incident_type: one of "fire" | "ems" | "police" | "accident" | "other" | "unknown" tags: list of specific descriptive tags, max 6, e.g. "two-car mva", "working fire", "shots-fired" location: most specific location string found, or empty string vehicles: list of vehicle descriptions mentioned units: list of unit IDs or officer numbers explicitly mentioned severity: one of "minor" | "moderate" | "major" | "unknown" resolved: true if this scene explicitly signals incident closure, false otherwise reassignment: true if dispatch is actively pulling a unit away from their current assignment to respond to a new, different call — e.g. "Baker, can you clear and respond to...", "Adam, break from that and go to...". False if the unit is simply reporting in, updating status, or continuing their current assignment. transcript_corrected: corrected text for this scene's transmissions only, or null Rules: - location: prefer intersections > addresses > mile markers > route+town > route alone > town alone. Empty string if none. - tags: describe WHAT happened, not WHERE. Specific, lowercase, hyphenated. Do not use location names, road names, talkgroup names, or place names as tags (wrong: "lower-macy's", "canvas-route-6", "route-202"; right: "suspect-search", "shoplifting", "vehicle-pursuit"). Do not repeat incident_type as a tag. - units: only identifiers explicitly mentioned, not inferred. - Do not invent details not present in the transcript. - incident_type: let the talkgroup channel be your primary signal. Use "fire" ONLY if the talkgroup is clearly a fire/rescue channel OR the transcript explicitly describes active fire, smoke, flames, or structure fire activation. Police or EMS referencing a fire scene → use "police" or "ems". When uncertain, prefer "other" over "fire". - ten_codes: interpret radio codes using the department reference provided below. Do not guess codes not listed. - resolved: true only when the scene explicitly signals "Code 4", "all clear", "10-42", "in custody", "patient transported", "fire out", "GOA", "negative contact", "scene clear". - reassignment: only true when a unit is explicitly being pulled to a completely new call or location. A unit going en route to their first dispatch is NOT a reassignment. Routine status updates, acknowledgements, and scene updates are NOT reassignments. - transcript_corrected: fix only clear STT/vocoder errors (e.g. "Several" → "10-4", misheard street names, garbled unit IDs). Keep all radio language as-is — do NOT decode codes into plain English. Return null if accurate. System: {system_id} Talkgroup: {talkgroup_name} {ten_codes_block}{vocabulary_block}{transcript_block}""" # Nominatim viewbox half-width in degrees (~11 km at mid-latitudes) _GEO_DELTA = 0.1 # node_id → state abbreviation/name from one-time reverse geocode _node_state_cache: dict[str, str] = {} # Strip P25 service suffixes to extract the municipality name from a talkgroup _TG_SUFFIX_RE = re.compile( r"\s*\b(police\s*dep(t|artment)?|pd|fire\s*(dep(t|artment)|district)?|" r"ems|rescue|dispatch|fd|tac(tical)?|ops|operations?|command|" r"(fire\s*)?ground|mutual\s*aid|channel|ch\b|car[-\s]to[-\s]car|" r"division|unit)\b.*", re.IGNORECASE, ) def _build_ten_codes_block(ten_codes: dict[str, str]) -> str: if not ten_codes: return "" lines = "\n".join(f" {code}: {meaning}" for code, meaning in sorted(ten_codes.items())) return f"Department ten-codes:\n{lines}\n\n" async def extract_scenes( call_id: str, transcript: str, talkgroup_name: Optional[str] = None, talkgroup_id: Optional[int] = None, system_id: Optional[str] = None, segments: Optional[list[dict]] = None, node_id: Optional[str] = None, preserve_transcript_correction: bool = False, ) -> list[dict]: """ Split the transcript into one or more scenes and extract structured intelligence for each. Most calls return a single scene; a busy dispatch channel capturing back-to-back conversations returns multiple. Each scene dict contains: tags, incident_type, location, location_coords, resolved, severity, vehicles, units, transcript_corrected, segment_indices, embedding Side-effect: updates calls/{call_id} in Firestore with merged tags, location (primary scene), units/vehicles, severity, embedding, and optionally transcript_corrected. """ vocabulary: list[str] = [] ten_codes: dict[str, str] = {} if system_id: from app.internal.vocabulary_learner import get_vocabulary vocab_data = await get_vocabulary(system_id) vocabulary = vocab_data.get("vocabulary") or [] system_doc = await fstore.doc_get("systems", system_id) if system_doc: ten_codes = system_doc.get("ten_codes") or {} raw_scenes: list[dict] = await asyncio.to_thread( _sync_extract, transcript, talkgroup_name, talkgroup_id, system_id, segments, vocabulary, ten_codes, ) if not raw_scenes: return [] # Resolve node position once for geocoding all scenes node_lat: Optional[float] = None node_lon: Optional[float] = None if node_id: node_doc = await fstore.doc_get("nodes", node_id) if node_doc: node_lat = node_doc.get("lat") node_lon = node_doc.get("lon") processed: list[dict] = [] for scene in raw_scenes: tags: list[str] = scene.get("tags") or [] incident_type: Optional[str] = scene.get("incident_type") or None location: Optional[str] = scene.get("location") or None vehicles: list[str] = scene.get("vehicles") or [] units: list[str] = scene.get("units") or [] severity: str = scene.get("severity") or "unknown" resolved: bool = bool(scene.get("resolved", False)) reassignment: bool = bool(scene.get("reassignment", False)) transcript_corrected: Optional[str]= scene.get("transcript_corrected") or None segment_indices: Optional[list] = scene.get("segment_indices") if incident_type in ("unknown", "other", ""): incident_type = None # Geocode this scene's location location_coords: Optional[dict] = None if location and node_lat is not None and node_lon is not None: state = await _get_node_state(node_id, node_lat, node_lon) muni = _municipality_from_tg(talkgroup_name) hint_parts = [p for p in [muni, state] if p] query = f"{location}, {', '.join(hint_parts)}" if hint_parts else location location_coords = await _geocode_location(query, node_lat, node_lon) # Embed this scene's content scene_text = _build_scene_embed_text( transcript, segments, segment_indices, incident_type, transcript_corrected ) embedding = await asyncio.to_thread(_sync_embed, scene_text) processed.append({ "tags": tags, "incident_type": incident_type, "location": location, "location_coords": location_coords, "vehicles": vehicles, "units": units, "severity": severity, "resolved": resolved, "reassignment": reassignment, "transcript_corrected": transcript_corrected, "segment_indices": segment_indices, "embedding": embedding, }) # Merge across scenes for the call-level Firestore document. # Primary scene (first) owns location, severity, transcript_corrected. # Tags/units/vehicles are union-merged from all scenes. primary = processed[0] all_tags = list(dict.fromkeys(t for s in processed for t in s["tags"])) all_units = list(dict.fromkeys(u for s in processed for u in s["units"])) all_vehicles = list(dict.fromkeys(v for s in processed for v in s["vehicles"])) updates: dict = {"tags": all_tags, "severity": primary["severity"]} if primary["location"]: updates["location"] = primary["location"] if primary["location_coords"]: updates["location_coords"] = primary["location_coords"] if all_units: updates["units"] = all_units if all_vehicles: updates["vehicles"] = all_vehicles if primary["embedding"]: updates["embedding"] = primary["embedding"] if primary["transcript_corrected"] and not preserve_transcript_correction: updates["transcript_corrected"] = primary["transcript_corrected"] try: await fstore.doc_set("calls", call_id, updates) except Exception as e: logger.warning(f"Could not save intelligence for call {call_id}: {e}") scene_summary = ( f"{len(processed)} scene(s): " + ", ".join( f"[{s['incident_type'] or 'unclassified'} tags={s['tags'][:2]}]" for s in processed ) ) logger.info(f"Intelligence: call {call_id} → {scene_summary}") return processed async def _geocode_location( location_str: str, node_lat: float, node_lon: float ) -> Optional[dict]: """ Geocode a location string using Nominatim, biased toward the node's area. Returns {"lat": float, "lng": float} or None if geocoding fails. """ import httpx viewbox = ( f"{node_lon - _GEO_DELTA},{node_lat - _GEO_DELTA}," f"{node_lon + _GEO_DELTA},{node_lat + _GEO_DELTA}" ) params = { "q": location_str, "format": "json", "limit": 1, "viewbox": viewbox, "bounded": 1, } headers = {"User-Agent": "DRB-Dispatch/1.0 (public-safety radio monitor)"} try: async with httpx.AsyncClient(timeout=5.0) as client: r = await client.get( "https://nominatim.openstreetmap.org/search", params=params, headers=headers, ) r.raise_for_status() results = r.json() if results: coords = {"lat": float(results[0]["lat"]), "lng": float(results[0]["lon"])} logger.info(f"Geocoded '{location_str}' → {coords}") return coords except Exception as e: logger.warning(f"Geocoding failed for '{location_str}': {e}") return None async def _get_node_state(node_id: str, lat: float, lon: float) -> Optional[str]: """ Reverse geocode the node's position once to extract its state. Result is cached for the process lifetime — nodes don't move. """ if node_id in _node_state_cache: return _node_state_cache[node_id] import httpx headers = {"User-Agent": "DRB-Dispatch/1.0 (public-safety radio monitor)"} try: async with httpx.AsyncClient(timeout=5.0) as client: r = await client.get( "https://nominatim.openstreetmap.org/reverse", params={"lat": lat, "lon": lon, "format": "json", "zoom": 5}, headers=headers, ) r.raise_for_status() data = r.json() state = data.get("address", {}).get("state", "") if state: _node_state_cache[node_id] = state logger.info(f"Node {node_id} reverse-geocoded to state: {state!r}") return state except Exception as e: logger.warning(f"Node state reverse geocode failed: {e}") return None def _municipality_from_tg(tg_name: Optional[str]) -> Optional[str]: """ Extract the municipality name from a talkgroup name. e.g. "Ossining PD" → "Ossining", "Westchester County Fire" → "Westchester County" Returns None for tactical/operational channels with no useful location info. """ if not tg_name: return None cleaned = _TG_SUFFIX_RE.sub("", tg_name).strip() if not cleaned or cleaned.isdigit() or (len(cleaned) <= 3 and cleaned.isupper()): return None return cleaned def _build_transcript_block(transcript: str, segments: Optional[list[dict]]) -> str: """Format transcript as numbered transmissions if segments are available.""" if segments and len(segments) > 1: lines = [f"{i+1}. [{s['start']}s] {s['text']}" for i, s in enumerate(segments)] return f"Transmissions ({len(segments)}):\n" + "\n".join(lines) return f"Transcript:\n{transcript}" def _build_scene_embed_text( transcript: str, segments: Optional[list[dict]], segment_indices: Optional[list[int]], incident_type: Optional[str], transcript_corrected: Optional[str], ) -> str: """Build the text string to embed for a specific scene.""" prefix = f"[{incident_type}] " if incident_type else "" if transcript_corrected: return f"{prefix}{transcript_corrected}" if segments and segment_indices: texts = [segments[i]["text"] for i in segment_indices if i < len(segments)] return f"{prefix}{' '.join(texts)}" return f"{prefix}{transcript}" def _sync_extract( transcript: str, talkgroup_name: Optional[str], talkgroup_id: Optional[int], system_id: Optional[str], segments: Optional[list[dict]], vocabulary: Optional[list[str]] = None, ten_codes: Optional[dict[str, str]] = None, ) -> list[dict]: """Call GPT-4o-mini and return a list of scene dicts.""" from app.config import settings from openai import OpenAI if not settings.openai_api_key: logger.warning("OPENAI_API_KEY not set — intelligence extraction disabled.") return [] from app.internal.vocabulary_learner import build_gpt_vocab_block tg = f"{talkgroup_name} (TGID {talkgroup_id})" if talkgroup_id else (talkgroup_name or "unknown") prompt = _PROMPT_TEMPLATE.format( transcript_block=_build_transcript_block(transcript, segments), talkgroup_name=tg, system_id=system_id or "unknown", ten_codes_block=_build_ten_codes_block(ten_codes or {}), vocabulary_block=build_gpt_vocab_block(vocabulary or []), ) try: client = OpenAI(api_key=settings.openai_api_key) response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"}, ) raw = json.loads(response.choices[0].message.content) # New format: {"scenes": [...]} if "scenes" in raw and isinstance(raw["scenes"], list): return raw["scenes"] # Fallback: GPT returned the old flat single-scene format logger.warning("GPT returned flat format instead of scenes array — wrapping") return [raw] except json.JSONDecodeError as e: logger.warning(f"GPT-4o-mini returned non-JSON: {e}") return [] except Exception as e: logger.warning(f"GPT-4o-mini extraction failed: {e}") return [] def _sync_embed(text: str) -> Optional[list[float]]: """Generate a text-embedding-3-small vector for semantic similarity.""" from app.config import settings from openai import OpenAI if not settings.openai_api_key: return None try: client = OpenAI(api_key=settings.openai_api_key) result = client.embeddings.create(model="text-embedding-3-small", input=text) return result.data[0].embedding except Exception as e: logger.warning(f"Embedding generation failed: {e}") return None