from typing import Optional from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from app.internal.storage import upload_audio from app.internal import firestore as fstore from app.internal.logger import logger from app.config import settings router = APIRouter(tags=["upload"]) _bearer = HTTPBearer(auto_error=False) @router.post("/upload") async def upload_call_audio( background_tasks: BackgroundTasks, file: UploadFile = File(...), call_id: str = Form(...), node_id: str = Form(...), talkgroup_id: Optional[int] = Form(None), talkgroup_name: Optional[str] = Form(None), system_id: Optional[str] = Form(None), credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), ): """ Receive an audio recording from an edge node. Upload to GCS, update the call document in Firestore with the audio URL, then kick off the intelligence pipeline as a background task. """ # Verify the per-node API key if not credentials: raise HTTPException(401, "Missing authorization") key_doc = await fstore.doc_get("node_keys", node_id) if not key_doc: logger.warning(f"Upload 401: no key_doc in Firestore for node_id={node_id!r}") raise HTTPException(401, "Invalid node API key") if key_doc.get("api_key") != credentials.credentials: logger.warning( f"Upload 401: key mismatch for node_id={node_id!r} " f"(received prefix: {credentials.credentials[:8]}...)" ) raise HTTPException(401, "Invalid node API key") data = await file.read() if not data: raise HTTPException(400, "Empty file.") if len(data) > settings.upload_max_bytes: raise HTTPException(413, f"File too large (max {settings.upload_max_bytes // (1024*1024)} MB).") audio_url = await upload_audio(data, file.filename or "", call_id=call_id) if audio_url: try: await fstore.doc_set("calls", call_id, {"audio_url": audio_url}) except Exception as e: logger.warning(f"Could not update call {call_id} with audio_url: {e}") # Convert public GCS URL to gs:// URI for Speech-to-Text gcs_uri = _public_url_to_gcs_uri(audio_url) background_tasks.add_task( _run_intelligence_pipeline, call_id=call_id, node_id=node_id, system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, gcs_uri=gcs_uri, ) return {"url": audio_url} def _public_url_to_gcs_uri(url: str) -> Optional[str]: """ Convert a public GCS URL (possibly signed) like https://storage.googleapis.com/bucket/calls/file.mp3?Expires=... to a gs:// URI usable by Speech-to-Text. Returns None if the URL doesn't look like a GCS URL. """ prefix = "https://storage.googleapis.com/" if url and url.startswith(prefix): path = url[len(prefix):].split("?")[0] # strip signed-URL query params return "gs://" + path return None async def _correlate_with_consensus( call_id: str, node_id: str, system_id: Optional[str], talkgroup_id: Optional[int], talkgroup_name: Optional[str], tags: list[str], incident_type: Optional[str], location: Optional[str], location_coords: Optional[dict], units: Optional[list] = None, vehicles: Optional[list] = None, cleared_units: Optional[list] = None, reassignment: bool = False, ) -> Optional[str]: """ Consensus correlator: runs the rules engine and the cheap LLM in sequence. If they agree the rules decision is committed directly. If they disagree a smarter tiebreaker LLM makes the final call. Falls back to rules-only when GEMINI_API_KEY is absent, the call is content-free (thin), or any LLM call fails. """ from app.internal import incident_correlator, llm_correlator preview = await incident_correlator.preview_correlation( call_id=call_id, node_id=node_id, system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=tags, incident_type=incident_type, location=location, location_coords=location_coords, units=units, vehicles=vehicles, cleared_units=cleared_units, reassignment=reassignment, ) ctx = preview["ctx"] rules_decision = preview["decision"] llm_decision = await llm_correlator.decide(call_id, ctx) if llm_decision is None: # LLM unavailable, skipped (thin call), or errored — rules wins. rules_decision["corr_debug"]["corr_consensus"] = "rules_only" return await incident_correlator.apply_correlation(preview) if llm_correlator.decisions_agree(rules_decision, llm_decision): rules_decision["corr_debug"]["corr_consensus"] = "agreed" rules_decision["corr_debug"]["corr_llm_reasoning"] = llm_decision.get("reasoning", "") return await incident_correlator.apply_correlation(preview) # Disagree — escalate to the smarter tiebreaker. logger.info( f"Consensus disagreement for call {call_id}: " f"rules={rules_decision['action']} vs llm={llm_decision['action']} — tiebreak" ) final = await llm_correlator.tiebreak(rules_decision, llm_decision, ctx) final["corr_debug"]["corr_consensus"] = "tiebreak" final["corr_debug"]["corr_rules_action"] = rules_decision["action"] final["corr_debug"]["corr_llm_action"] = llm_decision["action"] return await incident_correlator.apply_correlation({"decision": final, "ctx": ctx}) async def _run_extraction_pipeline( call_id: str, node_id: str, system_id: Optional[str], talkgroup_id: Optional[int], talkgroup_name: Optional[str], transcript: str, segments: Optional[list] = None, preserve_transcript_correction: bool = False, ) -> None: """Run steps 2-4 of the intelligence pipeline using an existing transcript.""" from app.internal import intelligence, incident_correlator, alerter # Step 2: Scene detection + intelligence extraction. # Returns one scene per distinct incident detected in the recording. scenes = await intelligence.extract_scenes( call_id, transcript, talkgroup_name, talkgroup_id=talkgroup_id, system_id=system_id, segments=segments, node_id=node_id, preserve_transcript_correction=preserve_transcript_correction, ) # Step 3: Correlate each scene to an incident independently. incident_ids: list[str] = [] all_tags: list[str] = [] for scene in scenes: all_tags.extend(scene["tags"]) # When dispatch is pulling a unit to a NEW call (reassignment), suppress unit # overlap so the new scene doesn't chain into the unit's previous incident. is_reassignment = bool(scene.get("reassignment")) corr_units = [] if is_reassignment else scene.get("units") incident_id = await _correlate_with_consensus( call_id=call_id, node_id=node_id, system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=scene["tags"], incident_type=scene["incident_type"], location=scene["location"], location_coords=scene["location_coords"], units=corr_units, vehicles=scene.get("vehicles"), cleared_units=scene.get("cleared_units"), reassignment=is_reassignment, ) if incident_id and incident_id not in incident_ids: incident_ids.append(incident_id) if scene["resolved"] and incident_id: await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) await incident_correlator.maybe_resolve_parent(incident_id) logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") if incident_ids: await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids}) # Step 4: Alert dispatch — run once with merged tags from all scenes. await alerter.check_and_dispatch( call_id=call_id, node_id=node_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=list(dict.fromkeys(all_tags)), transcript=transcript, ) async def _run_intelligence_pipeline( call_id: str, node_id: str, system_id: Optional[str], talkgroup_id: Optional[int], talkgroup_name: Optional[str], gcs_uri: Optional[str], ) -> None: """ Post-upload intelligence pipeline (runs as a background task): 1. Transcribe audio via Google STT 2. Detect scenes + extract intelligence (one result per incident in recording) 3. Correlate each scene with existing incidents (or create new ones) 4. Check alert rules and dispatch notifications """ from app.internal import transcription, intelligence, incident_correlator, alerter from app.internal.feature_flags import get_flags flags = await get_flags() # Resolve per-system overrides: system flag=False beats global flag=True, # but global flag=False beats everything (master switch). system_ai_flags: dict = {} if system_id: sys_doc = await fstore.doc_get_cached("systems", system_id) system_ai_flags = (sys_doc or {}).get("ai_flags") or {} def _flag(name: str) -> bool: if not flags[name]: # global master off return False return system_ai_flags.get(name, True) # system override, default inherit transcript: Optional[str] = None segments: list[dict] = [] # Step 1: Transcription if gcs_uri: if _flag("stt_enabled"): transcript, segments = await transcription.transcribe_call( call_id, gcs_uri, talkgroup_name, system_id=system_id ) else: scope = "globally" if not flags["stt_enabled"] else f"system {system_id}" logger.info(f"STT disabled ({scope}) — skipping transcription for call {call_id}") # Step 2: Scene detection + intelligence extraction scenes: list[dict] = [] if _flag("correlation_enabled"): if transcript: scenes = await intelligence.extract_scenes( call_id, transcript, talkgroup_name, talkgroup_id=talkgroup_id, system_id=system_id, segments=segments, node_id=node_id, ) else: scope = "globally" if not flags["correlation_enabled"] else f"system {system_id}" logger.info(f"Correlation disabled ({scope}) — skipping scene extraction and correlation for call {call_id}") # Step 3: Correlate each scene independently. # A single recording can produce multiple incidents on a busy channel. incident_ids: list[str] = [] all_tags: list[str] = [] if flags["correlation_enabled"]: for scene in scenes: all_tags.extend(scene["tags"]) is_reassignment = bool(scene.get("reassignment")) corr_units = [] if is_reassignment else scene.get("units") incident_id = await _correlate_with_consensus( call_id=call_id, node_id=node_id, system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=scene["tags"], incident_type=scene["incident_type"], location=scene["location"], location_coords=scene["location_coords"], units=corr_units, vehicles=scene.get("vehicles"), cleared_units=scene.get("cleared_units"), reassignment=is_reassignment, ) if incident_id and incident_id not in incident_ids: incident_ids.append(incident_id) if scene["resolved"] and incident_id: await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) await incident_correlator.maybe_resolve_parent(incident_id) logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") # Correlator also runs for calls with no scenes (unclassified) to attempt # talkgroup-based linking even when no transcript could be produced. # Skip when extraction flagged the call — garbage or too-short transcripts # carry no signal and would only attach spuriously via the thin path. if not scenes: _call_doc = await fstore.doc_get("calls", call_id) if not (_call_doc or {}).get("skip_reason"): incident_id = await _correlate_with_consensus( call_id=call_id, node_id=node_id, system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=[], incident_type=None, location=None, location_coords=None, ) if incident_id: incident_ids.append(incident_id) if incident_ids: await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids}) # Step 4: Alert dispatch (always runs — talkgroup ID rules don't need a transcript) await alerter.check_and_dispatch( call_id=call_id, node_id=node_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=list(dict.fromkeys(all_tags)), transcript=transcript, )