""" Gemini-powered intelligence extraction from call transcripts. Sends the transcript to Gemini Flash with a tight JSON schema prompt. Returns structured data: incident type, tags, location, vehicles, units, severity. Falls back gracefully if Gemini is unavailable or returns malformed output. """ import asyncio import json from typing import Optional from app.internal.logger import logger from app.internal import firestore as fstore _PROMPT_TEMPLATE = """You are analyzing a P25 public safety radio recording. The audio was transcribed by Whisper through a digital radio vocoder, which introduces errors. Each numbered transmission is a separate PTT press from a different radio. Extract structured information and respond ONLY with a single valid JSON object — no markdown, no explanation. Schema: {{ "incident_type": one of "fire" | "ems" | "police" | "accident" | "other" | "unknown", "tags": [list of specific descriptive tags, max 6, e.g. "two-car mva", "property-damage-only", "working fire", "shots-fired"], "location": "most specific location string found, or empty string", "vehicles": [vehicle descriptions mentioned, e.g. "Hyundai Tucson", "black sedan"], "units": [unit IDs or officer numbers mentioned, e.g. "Unit 511", "Car 4"], "severity": one of "minor" | "moderate" | "major" | "unknown", "transcript_corrected": "corrected full transcript string, or null if no corrections needed" }} Rules: - location: prefer intersections > addresses > mile markers > route+town > route alone > town alone. Empty string if none. - tags: be specific and lowercase, hyphenated. Do not repeat incident_type as a tag. - units: only identifiers explicitly mentioned, not inferred. - Do not invent details not present in the transcript. - transcript_corrected: fix only clear STT errors caused by vocoder distortion (e.g. "Several" → "10-4", misheard street names, garbled unit IDs). Use the back-and-forth context between transmissions to resolve ambiguities. Keep all radio language as-is — do NOT decode codes into plain English. Return null if the transcript looks accurate. System: {system_id} Talkgroup: {talkgroup_name} {transcript_block}""" async def extract_tags( call_id: str, transcript: str, talkgroup_name: Optional[str] = None, talkgroup_id: Optional[int] = None, system_id: Optional[str] = None, segments: Optional[list[dict]] = None, ) -> tuple[list[str], Optional[str], Optional[str]]: """ Extract incident tags, type, location, and corrected transcript via Gemini. Returns: (tags, primary_type, location) Side-effect: updates calls/{call_id} in Firestore with tags, location, vehicles, units, severity, transcript_corrected; also stores the call embedding. """ result = await asyncio.to_thread(_sync_extract, transcript, talkgroup_name, talkgroup_id, system_id, segments) tags: list[str] = result.get("tags") or [] incident_type: Optional[str] = result.get("incident_type") or None location: Optional[str] = result.get("location") or None vehicles: list[str] = result.get("vehicles") or [] units: list[str] = result.get("units") or [] severity: str = result.get("severity") or "unknown" transcript_corrected: Optional[str] = result.get("transcript_corrected") or None if incident_type in ("unknown", "other", ""): incident_type = None # Store embedding alongside structured data embedding = await asyncio.to_thread(_sync_embed, _embed_text(transcript, incident_type)) updates: dict = { "tags": tags, "severity": severity, } if location: updates["location"] = location if vehicles: updates["vehicles"] = vehicles if units: updates["units"] = units if embedding: updates["embedding"] = embedding if transcript_corrected: updates["transcript_corrected"] = transcript_corrected try: await fstore.doc_set("calls", call_id, updates) except Exception as e: logger.warning(f"Could not save intelligence for call {call_id}: {e}") logger.info( f"Intelligence: call {call_id} → type={incident_type}, " f"tags={tags}, location={location!r}, severity={severity}, " f"corrected={transcript_corrected is not None}" ) return tags, incident_type, location def _build_transcript_block(transcript: str, segments: Optional[list[dict]]) -> str: """Format transcript as numbered transmissions if segments are available.""" if segments and len(segments) > 1: lines = [f"{i+1}. [{s['start']}s] {s['text']}" for i, s in enumerate(segments)] return f"Transmissions ({len(segments)}):\n" + "\n".join(lines) return f"Transcript:\n{transcript}" def _sync_extract( transcript: str, talkgroup_name: Optional[str], talkgroup_id: Optional[int], system_id: Optional[str], segments: Optional[list[dict]], ) -> dict: """Call GPT-4o mini and parse the JSON response.""" from app.config import settings from openai import OpenAI if not settings.openai_api_key: logger.warning("OPENAI_API_KEY not set — intelligence extraction disabled.") return {} tg = f"{talkgroup_name} (TGID {talkgroup_id})" if talkgroup_id else (talkgroup_name or "unknown") prompt = _PROMPT_TEMPLATE.format( transcript_block=_build_transcript_block(transcript, segments), talkgroup_name=tg, system_id=system_id or "unknown", ) try: client = OpenAI(api_key=settings.openai_api_key) response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"}, ) return json.loads(response.choices[0].message.content) except json.JSONDecodeError as e: logger.warning(f"GPT-4o mini returned non-JSON: {e}") return {} except Exception as e: logger.warning(f"GPT-4o mini extraction failed: {e}") return {} def _sync_embed(text: str) -> Optional[list[float]]: """Generate a text-embedding-3-small vector for semantic similarity.""" from app.config import settings from openai import OpenAI if not settings.openai_api_key: return None try: client = OpenAI(api_key=settings.openai_api_key) result = client.embeddings.create( model="text-embedding-3-small", input=text, ) return result.data[0].embedding except Exception as e: logger.warning(f"Embedding generation failed: {e}") return None def _embed_text(transcript: str, incident_type: Optional[str]) -> str: """Build the text string to embed — transcript + type context.""" prefix = f"[{incident_type}] " if incident_type else "" return f"{prefix}{transcript}"