Files
server-26/drb-c2-core/app/internal/intelligence.py
T
2026-04-12 23:33:44 -04:00

141 lines
4.8 KiB
Python

"""
Gemini-powered intelligence extraction from call transcripts.
Sends the transcript to Gemini Flash with a tight JSON schema prompt.
Returns structured data: incident type, tags, location, vehicles, units, severity.
Falls back gracefully if Gemini is unavailable or returns malformed output.
"""
import asyncio
import json
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
_PROMPT_TEMPLATE = """You are analyzing a P25 public safety radio transcript. Extract structured information and respond ONLY with a single valid JSON object — no markdown, no explanation.
Schema:
{{
"incident_type": one of "fire" | "ems" | "police" | "accident" | "other" | "unknown",
"tags": [list of specific descriptive tags, max 6, e.g. "two-car mva", "property-damage-only", "working fire", "shots-fired"],
"location": "most specific location string found, or empty string",
"vehicles": [vehicle descriptions mentioned, e.g. "Hyundai Tucson", "black sedan"],
"units": [unit IDs or officer numbers mentioned, e.g. "Unit 511", "Car 4"],
"severity": one of "minor" | "moderate" | "major" | "unknown"
}}
Rules:
- location: prefer intersections > addresses > mile markers > route+town > route alone > town alone. Empty string if none.
- tags: be specific and lowercase, hyphenated. Do not repeat incident_type as a tag.
- units: only identifiers explicitly mentioned, not inferred.
- Do not invent details not present in the transcript.
Transcript:
{transcript}"""
async def extract_tags(
call_id: str,
transcript: str,
) -> tuple[list[str], Optional[str], Optional[str]]:
"""
Extract incident tags, type, and location from a transcript via Gemini.
Returns:
(tags, primary_type, location)
Side-effect: updates calls/{call_id} in Firestore with tags, location,
vehicles, units, severity; also stores the call embedding.
"""
result = await asyncio.to_thread(_sync_extract, transcript)
tags: list[str] = result.get("tags") or []
incident_type: Optional[str] = result.get("incident_type") or None
location: Optional[str] = result.get("location") or None
vehicles: list[str] = result.get("vehicles") or []
units: list[str] = result.get("units") or []
severity: str = result.get("severity") or "unknown"
if incident_type in ("unknown", "other", ""):
incident_type = None
# Store embedding alongside structured data
embedding = await asyncio.to_thread(_sync_embed, _embed_text(transcript, incident_type))
updates: dict = {
"tags": tags,
"severity": severity,
}
if location:
updates["location"] = location
if vehicles:
updates["vehicles"] = vehicles
if units:
updates["units"] = units
if embedding:
updates["embedding"] = embedding
try:
await fstore.doc_set("calls", call_id, updates)
except Exception as e:
logger.warning(f"Could not save intelligence for call {call_id}: {e}")
logger.info(
f"Intelligence: call {call_id} → type={incident_type}, "
f"tags={tags}, location={location!r}, severity={severity}"
)
return tags, incident_type, location
def _sync_extract(transcript: str) -> dict:
"""Call Gemini Flash and parse the JSON response."""
from app.config import settings
import google.generativeai as genai
if not settings.gemini_api_key:
logger.warning("GEMINI_API_KEY not set — intelligence extraction disabled.")
return {}
genai.configure(api_key=settings.gemini_api_key)
model = genai.GenerativeModel(
"gemini-1.5-flash",
generation_config={"response_mime_type": "application/json"},
)
try:
response = model.generate_content(_PROMPT_TEMPLATE.format(transcript=transcript))
return json.loads(response.text)
except json.JSONDecodeError as e:
logger.warning(f"Gemini returned non-JSON: {e}")
return {}
except Exception as e:
logger.warning(f"Gemini extraction failed: {e}")
return {}
def _sync_embed(text: str) -> Optional[list[float]]:
"""Generate a text-embedding-004 vector for semantic similarity."""
from app.config import settings
import google.generativeai as genai
if not settings.gemini_api_key:
return None
genai.configure(api_key=settings.gemini_api_key)
try:
result = genai.embed_content(
model="models/text-embedding-004",
content=text,
task_type="SEMANTIC_SIMILARITY",
)
return result["embedding"]
except Exception as e:
logger.warning(f"Embedding generation failed: {e}")
return None
def _embed_text(transcript: str, incident_type: Optional[str]) -> str:
"""Build the text string to embed — transcript + type context."""
prefix = f"[{incident_type}] " if incident_type else ""
return f"{prefix}{transcript}"