172 lines
6.7 KiB
Python
172 lines
6.7 KiB
Python
"""
|
|
Gemini-powered intelligence extraction from call transcripts.
|
|
|
|
Sends the transcript to Gemini Flash with a tight JSON schema prompt.
|
|
Returns structured data: incident type, tags, location, vehicles, units, severity.
|
|
|
|
Falls back gracefully if Gemini is unavailable or returns malformed output.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
from typing import Optional
|
|
from app.internal.logger import logger
|
|
from app.internal import firestore as fstore
|
|
|
|
_PROMPT_TEMPLATE = """You are analyzing a P25 public safety radio recording. The audio was transcribed by Whisper through a digital radio vocoder, which introduces errors. Each numbered transmission is a separate PTT press from a different radio. Extract structured information and respond ONLY with a single valid JSON object — no markdown, no explanation.
|
|
|
|
Schema:
|
|
{{
|
|
"incident_type": one of "fire" | "ems" | "police" | "accident" | "other" | "unknown",
|
|
"tags": [list of specific descriptive tags, max 6, e.g. "two-car mva", "property-damage-only", "working fire", "shots-fired"],
|
|
"location": "most specific location string found, or empty string",
|
|
"vehicles": [vehicle descriptions mentioned, e.g. "Hyundai Tucson", "black sedan"],
|
|
"units": [unit IDs or officer numbers mentioned, e.g. "Unit 511", "Car 4"],
|
|
"severity": one of "minor" | "moderate" | "major" | "unknown",
|
|
"transcript_corrected": "corrected full transcript string, or null if no corrections needed"
|
|
}}
|
|
|
|
Rules:
|
|
- location: prefer intersections > addresses > mile markers > route+town > route alone > town alone. Empty string if none.
|
|
- tags: be specific and lowercase, hyphenated. Do not repeat incident_type as a tag.
|
|
- units: only identifiers explicitly mentioned, not inferred.
|
|
- Do not invent details not present in the transcript.
|
|
- transcript_corrected: fix only clear STT errors caused by vocoder distortion (e.g. "Several" → "10-4", misheard street names, garbled unit IDs). Use the back-and-forth context between transmissions to resolve ambiguities. Keep all radio language as-is — do NOT decode codes into plain English. Return null if the transcript looks accurate.
|
|
|
|
System: {system_id}
|
|
Talkgroup: {talkgroup_name}
|
|
{transcript_block}"""
|
|
|
|
|
|
async def extract_tags(
|
|
call_id: str,
|
|
transcript: str,
|
|
talkgroup_name: Optional[str] = None,
|
|
talkgroup_id: Optional[int] = None,
|
|
system_id: Optional[str] = None,
|
|
segments: Optional[list[dict]] = None,
|
|
) -> tuple[list[str], Optional[str], Optional[str]]:
|
|
"""
|
|
Extract incident tags, type, location, and corrected transcript via Gemini.
|
|
|
|
Returns:
|
|
(tags, primary_type, location)
|
|
|
|
Side-effect: updates calls/{call_id} in Firestore with tags, location,
|
|
vehicles, units, severity, transcript_corrected; also stores the call embedding.
|
|
"""
|
|
result = await asyncio.to_thread(_sync_extract, transcript, talkgroup_name, talkgroup_id, system_id, segments)
|
|
|
|
tags: list[str] = result.get("tags") or []
|
|
incident_type: Optional[str] = result.get("incident_type") or None
|
|
location: Optional[str] = result.get("location") or None
|
|
vehicles: list[str] = result.get("vehicles") or []
|
|
units: list[str] = result.get("units") or []
|
|
severity: str = result.get("severity") or "unknown"
|
|
transcript_corrected: Optional[str] = result.get("transcript_corrected") or None
|
|
|
|
if incident_type in ("unknown", "other", ""):
|
|
incident_type = None
|
|
|
|
# Store embedding alongside structured data
|
|
embedding = await asyncio.to_thread(_sync_embed, _embed_text(transcript, incident_type))
|
|
|
|
updates: dict = {
|
|
"tags": tags,
|
|
"severity": severity,
|
|
}
|
|
if location:
|
|
updates["location"] = location
|
|
if vehicles:
|
|
updates["vehicles"] = vehicles
|
|
if units:
|
|
updates["units"] = units
|
|
if embedding:
|
|
updates["embedding"] = embedding
|
|
if transcript_corrected:
|
|
updates["transcript_corrected"] = transcript_corrected
|
|
|
|
try:
|
|
await fstore.doc_set("calls", call_id, updates)
|
|
except Exception as e:
|
|
logger.warning(f"Could not save intelligence for call {call_id}: {e}")
|
|
|
|
logger.info(
|
|
f"Intelligence: call {call_id} → type={incident_type}, "
|
|
f"tags={tags}, location={location!r}, severity={severity}, "
|
|
f"corrected={transcript_corrected is not None}"
|
|
)
|
|
return tags, incident_type, location
|
|
|
|
|
|
def _build_transcript_block(transcript: str, segments: Optional[list[dict]]) -> str:
|
|
"""Format transcript as numbered transmissions if segments are available."""
|
|
if segments and len(segments) > 1:
|
|
lines = [f"{i+1}. [{s['start']}s] {s['text']}" for i, s in enumerate(segments)]
|
|
return f"Transmissions ({len(segments)}):\n" + "\n".join(lines)
|
|
return f"Transcript:\n{transcript}"
|
|
|
|
|
|
def _sync_extract(
|
|
transcript: str,
|
|
talkgroup_name: Optional[str],
|
|
talkgroup_id: Optional[int],
|
|
system_id: Optional[str],
|
|
segments: Optional[list[dict]],
|
|
) -> dict:
|
|
"""Call Gemini Flash and parse the JSON response."""
|
|
from app.config import settings
|
|
import google.generativeai as genai
|
|
|
|
if not settings.gemini_api_key:
|
|
logger.warning("GEMINI_API_KEY not set — intelligence extraction disabled.")
|
|
return {}
|
|
|
|
genai.configure(api_key=settings.gemini_api_key)
|
|
model = genai.GenerativeModel(
|
|
"gemini-2.5-flash-lite",
|
|
generation_config={"response_mime_type": "application/json"},
|
|
)
|
|
|
|
tg = f"{talkgroup_name} (TGID {talkgroup_id})" if talkgroup_id else (talkgroup_name or "unknown")
|
|
prompt = _PROMPT_TEMPLATE.format(
|
|
transcript_block=_build_transcript_block(transcript, segments),
|
|
talkgroup_name=tg,
|
|
system_id=system_id or "unknown",
|
|
)
|
|
|
|
try:
|
|
response = model.generate_content(prompt)
|
|
return json.loads(response.text)
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Gemini returned non-JSON: {e}")
|
|
return {}
|
|
except Exception as e:
|
|
logger.warning(f"Gemini extraction failed: {e}")
|
|
return {}
|
|
|
|
|
|
def _sync_embed(text: str) -> Optional[list[float]]:
|
|
"""Generate a text-embedding-3-small vector for semantic similarity."""
|
|
from app.config import settings
|
|
from openai import OpenAI
|
|
|
|
if not settings.openai_api_key:
|
|
return None
|
|
|
|
try:
|
|
client = OpenAI(api_key=settings.openai_api_key)
|
|
result = client.embeddings.create(
|
|
model="text-embedding-3-small",
|
|
input=text,
|
|
)
|
|
return result.data[0].embedding
|
|
except Exception as e:
|
|
logger.warning(f"Embedding generation failed: {e}")
|
|
return None
|
|
|
|
|
|
def _embed_text(transcript: str, incident_type: Optional[str]) -> str:
|
|
"""Build the text string to embed — transcript + type context."""
|
|
prefix = f"[{incident_type}] " if incident_type else ""
|
|
return f"{prefix}{transcript}"
|