cbcc85f7b1
Refactor incident_correlator.py to a decision/commit split (preview_correlation / apply_correlation) so the rules engine and LLM can both produce decisions before anything is written to Firestore. Add llm_correlator.py: cheap Gemini Flash first-pass + Gemini Pro tiebreaker. Wire _correlate_with_consensus in upload.py — rules-only fallback when key is absent or call is thin; agreed/tiebreak consensus written to corr_debug.
334 lines
13 KiB
Python
334 lines
13 KiB
Python
from typing import Optional
|
|
from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form, HTTPException, Security
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from app.internal.storage import upload_audio
|
|
from app.internal import firestore as fstore
|
|
from app.internal.logger import logger
|
|
|
|
router = APIRouter(tags=["upload"])
|
|
|
|
_bearer = HTTPBearer(auto_error=False)
|
|
|
|
|
|
@router.post("/upload")
|
|
async def upload_call_audio(
|
|
background_tasks: BackgroundTasks,
|
|
file: UploadFile = File(...),
|
|
call_id: str = Form(...),
|
|
node_id: str = Form(...),
|
|
talkgroup_id: Optional[int] = Form(None),
|
|
talkgroup_name: Optional[str] = Form(None),
|
|
system_id: Optional[str] = Form(None),
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
|
|
):
|
|
"""
|
|
Receive an audio recording from an edge node.
|
|
Upload to GCS, update the call document in Firestore with the audio URL,
|
|
then kick off the intelligence pipeline as a background task.
|
|
"""
|
|
# Verify the per-node API key
|
|
if not credentials:
|
|
raise HTTPException(401, "Missing authorization")
|
|
key_doc = await fstore.doc_get("node_keys", node_id)
|
|
if not key_doc:
|
|
logger.warning(f"Upload 401: no key_doc in Firestore for node_id={node_id!r}")
|
|
raise HTTPException(401, "Invalid node API key")
|
|
if key_doc.get("api_key") != credentials.credentials:
|
|
logger.warning(
|
|
f"Upload 401: key mismatch for node_id={node_id!r} "
|
|
f"(received prefix: {credentials.credentials[:8]}...)"
|
|
)
|
|
raise HTTPException(401, "Invalid node API key")
|
|
|
|
data = await file.read()
|
|
if not data:
|
|
raise HTTPException(400, "Empty file.")
|
|
|
|
filename = file.filename
|
|
audio_url = await upload_audio(data, filename)
|
|
|
|
if audio_url:
|
|
try:
|
|
await fstore.doc_set("calls", call_id, {"audio_url": audio_url})
|
|
except Exception as e:
|
|
logger.warning(f"Could not update call {call_id} with audio_url: {e}")
|
|
|
|
# Convert public GCS URL to gs:// URI for Speech-to-Text
|
|
gcs_uri = _public_url_to_gcs_uri(audio_url)
|
|
|
|
background_tasks.add_task(
|
|
_run_intelligence_pipeline,
|
|
call_id=call_id,
|
|
node_id=node_id,
|
|
system_id=system_id,
|
|
talkgroup_id=talkgroup_id,
|
|
talkgroup_name=talkgroup_name,
|
|
gcs_uri=gcs_uri,
|
|
)
|
|
|
|
return {"url": audio_url}
|
|
|
|
|
|
def _public_url_to_gcs_uri(url: str) -> Optional[str]:
|
|
"""
|
|
Convert a public GCS URL (possibly signed) like
|
|
https://storage.googleapis.com/bucket/calls/file.mp3?Expires=...
|
|
to a gs:// URI usable by Speech-to-Text.
|
|
Returns None if the URL doesn't look like a GCS URL.
|
|
"""
|
|
prefix = "https://storage.googleapis.com/"
|
|
if url and url.startswith(prefix):
|
|
path = url[len(prefix):].split("?")[0] # strip signed-URL query params
|
|
return "gs://" + path
|
|
return None
|
|
|
|
|
|
async def _correlate_with_consensus(
|
|
call_id: str,
|
|
node_id: str,
|
|
system_id: Optional[str],
|
|
talkgroup_id: Optional[int],
|
|
talkgroup_name: Optional[str],
|
|
tags: list[str],
|
|
incident_type: Optional[str],
|
|
location: Optional[str],
|
|
location_coords: Optional[dict],
|
|
units: Optional[list] = None,
|
|
vehicles: Optional[list] = None,
|
|
cleared_units: Optional[list] = None,
|
|
reassignment: bool = False,
|
|
) -> Optional[str]:
|
|
"""
|
|
Consensus correlator: runs the rules engine and the cheap LLM in sequence.
|
|
If they agree the rules decision is committed directly.
|
|
If they disagree a smarter tiebreaker LLM makes the final call.
|
|
|
|
Falls back to rules-only when GEMINI_API_KEY is absent, the call is
|
|
content-free (thin), or any LLM call fails.
|
|
"""
|
|
from app.internal import incident_correlator, llm_correlator
|
|
|
|
preview = await incident_correlator.preview_correlation(
|
|
call_id=call_id, node_id=node_id, system_id=system_id,
|
|
talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name,
|
|
tags=tags, incident_type=incident_type, location=location,
|
|
location_coords=location_coords, units=units, vehicles=vehicles,
|
|
cleared_units=cleared_units, reassignment=reassignment,
|
|
)
|
|
ctx = preview["ctx"]
|
|
rules_decision = preview["decision"]
|
|
|
|
llm_decision = await llm_correlator.decide(call_id, ctx)
|
|
|
|
if llm_decision is None:
|
|
# LLM unavailable, skipped (thin call), or errored — rules wins.
|
|
rules_decision["corr_debug"]["corr_consensus"] = "rules_only"
|
|
return await incident_correlator.apply_correlation(preview)
|
|
|
|
if llm_correlator.decisions_agree(rules_decision, llm_decision):
|
|
rules_decision["corr_debug"]["corr_consensus"] = "agreed"
|
|
rules_decision["corr_debug"]["corr_llm_reasoning"] = llm_decision.get("reasoning", "")
|
|
return await incident_correlator.apply_correlation(preview)
|
|
|
|
# Disagree — escalate to the smarter tiebreaker.
|
|
logger.info(
|
|
f"Consensus disagreement for call {call_id}: "
|
|
f"rules={rules_decision['action']} vs llm={llm_decision['action']} — tiebreak"
|
|
)
|
|
final = await llm_correlator.tiebreak(rules_decision, llm_decision, ctx)
|
|
final["corr_debug"]["corr_consensus"] = "tiebreak"
|
|
final["corr_debug"]["corr_rules_action"] = rules_decision["action"]
|
|
final["corr_debug"]["corr_llm_action"] = llm_decision["action"]
|
|
return await incident_correlator.apply_correlation({"decision": final, "ctx": ctx})
|
|
|
|
|
|
async def _run_extraction_pipeline(
|
|
call_id: str,
|
|
node_id: str,
|
|
system_id: Optional[str],
|
|
talkgroup_id: Optional[int],
|
|
talkgroup_name: Optional[str],
|
|
transcript: str,
|
|
segments: Optional[list] = None,
|
|
preserve_transcript_correction: bool = False,
|
|
) -> None:
|
|
"""Run steps 2-4 of the intelligence pipeline using an existing transcript."""
|
|
from app.internal import intelligence, incident_correlator, alerter
|
|
|
|
# Step 2: Scene detection + intelligence extraction.
|
|
# Returns one scene per distinct incident detected in the recording.
|
|
scenes = await intelligence.extract_scenes(
|
|
call_id, transcript, talkgroup_name,
|
|
talkgroup_id=talkgroup_id, system_id=system_id, segments=segments,
|
|
node_id=node_id,
|
|
preserve_transcript_correction=preserve_transcript_correction,
|
|
)
|
|
|
|
# Step 3: Correlate each scene to an incident independently.
|
|
incident_ids: list[str] = []
|
|
all_tags: list[str] = []
|
|
for scene in scenes:
|
|
all_tags.extend(scene["tags"])
|
|
# When dispatch is pulling a unit to a NEW call (reassignment), suppress unit
|
|
# overlap so the new scene doesn't chain into the unit's previous incident.
|
|
is_reassignment = bool(scene.get("reassignment"))
|
|
corr_units = [] if is_reassignment else scene.get("units")
|
|
incident_id = await _correlate_with_consensus(
|
|
call_id=call_id,
|
|
node_id=node_id,
|
|
system_id=system_id,
|
|
talkgroup_id=talkgroup_id,
|
|
talkgroup_name=talkgroup_name,
|
|
tags=scene["tags"],
|
|
incident_type=scene["incident_type"],
|
|
location=scene["location"],
|
|
location_coords=scene["location_coords"],
|
|
units=corr_units,
|
|
vehicles=scene.get("vehicles"),
|
|
cleared_units=scene.get("cleared_units"),
|
|
reassignment=is_reassignment,
|
|
)
|
|
if incident_id and incident_id not in incident_ids:
|
|
incident_ids.append(incident_id)
|
|
if scene["resolved"] and incident_id:
|
|
await fstore.doc_set("incidents", incident_id, {"status": "resolved"})
|
|
await incident_correlator.maybe_resolve_parent(incident_id)
|
|
logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)")
|
|
|
|
if incident_ids:
|
|
await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids})
|
|
|
|
# Step 4: Alert dispatch — run once with merged tags from all scenes.
|
|
await alerter.check_and_dispatch(
|
|
call_id=call_id,
|
|
node_id=node_id,
|
|
talkgroup_id=talkgroup_id,
|
|
talkgroup_name=talkgroup_name,
|
|
tags=list(dict.fromkeys(all_tags)),
|
|
transcript=transcript,
|
|
)
|
|
|
|
|
|
async def _run_intelligence_pipeline(
|
|
call_id: str,
|
|
node_id: str,
|
|
system_id: Optional[str],
|
|
talkgroup_id: Optional[int],
|
|
talkgroup_name: Optional[str],
|
|
gcs_uri: Optional[str],
|
|
) -> None:
|
|
"""
|
|
Post-upload intelligence pipeline (runs as a background task):
|
|
1. Transcribe audio via Google STT
|
|
2. Detect scenes + extract intelligence (one result per incident in recording)
|
|
3. Correlate each scene with existing incidents (or create new ones)
|
|
4. Check alert rules and dispatch notifications
|
|
"""
|
|
from app.internal import transcription, intelligence, incident_correlator, alerter
|
|
from app.internal.feature_flags import get_flags
|
|
|
|
flags = await get_flags()
|
|
|
|
# Resolve per-system overrides: system flag=False beats global flag=True,
|
|
# but global flag=False beats everything (master switch).
|
|
system_ai_flags: dict = {}
|
|
if system_id:
|
|
sys_doc = await fstore.doc_get_cached("systems", system_id)
|
|
system_ai_flags = (sys_doc or {}).get("ai_flags") or {}
|
|
|
|
def _flag(name: str) -> bool:
|
|
if not flags[name]: # global master off
|
|
return False
|
|
return system_ai_flags.get(name, True) # system override, default inherit
|
|
|
|
transcript: Optional[str] = None
|
|
segments: list[dict] = []
|
|
|
|
# Step 1: Transcription
|
|
if gcs_uri:
|
|
if _flag("stt_enabled"):
|
|
transcript, segments = await transcription.transcribe_call(
|
|
call_id, gcs_uri, talkgroup_name, system_id=system_id
|
|
)
|
|
else:
|
|
scope = "globally" if not flags["stt_enabled"] else f"system {system_id}"
|
|
logger.info(f"STT disabled ({scope}) — skipping transcription for call {call_id}")
|
|
|
|
# Step 2: Scene detection + intelligence extraction
|
|
scenes: list[dict] = []
|
|
if _flag("correlation_enabled"):
|
|
if transcript:
|
|
scenes = await intelligence.extract_scenes(
|
|
call_id, transcript, talkgroup_name,
|
|
talkgroup_id=talkgroup_id, system_id=system_id, segments=segments,
|
|
node_id=node_id,
|
|
)
|
|
else:
|
|
scope = "globally" if not flags["correlation_enabled"] else f"system {system_id}"
|
|
logger.info(f"Correlation disabled ({scope}) — skipping scene extraction and correlation for call {call_id}")
|
|
|
|
# Step 3: Correlate each scene independently.
|
|
# A single recording can produce multiple incidents on a busy channel.
|
|
incident_ids: list[str] = []
|
|
all_tags: list[str] = []
|
|
if flags["correlation_enabled"]:
|
|
for scene in scenes:
|
|
all_tags.extend(scene["tags"])
|
|
is_reassignment = bool(scene.get("reassignment"))
|
|
corr_units = [] if is_reassignment else scene.get("units")
|
|
incident_id = await _correlate_with_consensus(
|
|
call_id=call_id,
|
|
node_id=node_id,
|
|
system_id=system_id,
|
|
talkgroup_id=talkgroup_id,
|
|
talkgroup_name=talkgroup_name,
|
|
tags=scene["tags"],
|
|
incident_type=scene["incident_type"],
|
|
location=scene["location"],
|
|
location_coords=scene["location_coords"],
|
|
units=corr_units,
|
|
vehicles=scene.get("vehicles"),
|
|
cleared_units=scene.get("cleared_units"),
|
|
reassignment=is_reassignment,
|
|
)
|
|
if incident_id and incident_id not in incident_ids:
|
|
incident_ids.append(incident_id)
|
|
if scene["resolved"] and incident_id:
|
|
await fstore.doc_set("incidents", incident_id, {"status": "resolved"})
|
|
await incident_correlator.maybe_resolve_parent(incident_id)
|
|
logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)")
|
|
|
|
# Correlator also runs for calls with no scenes (unclassified) to attempt
|
|
# talkgroup-based linking even when no transcript could be produced.
|
|
# Skip when extraction flagged the call — garbage or too-short transcripts
|
|
# carry no signal and would only attach spuriously via the thin path.
|
|
if not scenes:
|
|
_call_doc = await fstore.doc_get("calls", call_id)
|
|
if not (_call_doc or {}).get("skip_reason"):
|
|
incident_id = await _correlate_with_consensus(
|
|
call_id=call_id,
|
|
node_id=node_id,
|
|
system_id=system_id,
|
|
talkgroup_id=talkgroup_id,
|
|
talkgroup_name=talkgroup_name,
|
|
tags=[],
|
|
incident_type=None,
|
|
location=None,
|
|
location_coords=None,
|
|
)
|
|
if incident_id:
|
|
incident_ids.append(incident_id)
|
|
|
|
if incident_ids:
|
|
await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids})
|
|
|
|
# Step 4: Alert dispatch (always runs — talkgroup ID rules don't need a transcript)
|
|
await alerter.check_and_dispatch(
|
|
call_id=call_id,
|
|
node_id=node_id,
|
|
talkgroup_id=talkgroup_id,
|
|
talkgroup_name=talkgroup_name,
|
|
tags=list(dict.fromkeys(all_tags)),
|
|
transcript=transcript,
|
|
)
|