from typing import Optional from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from app.internal.storage import upload_audio from app.internal import firestore as fstore from app.internal.logger import logger router = APIRouter(tags=["upload"]) _bearer = HTTPBearer(auto_error=False) @router.post("/upload") async def upload_call_audio( background_tasks: BackgroundTasks, file: UploadFile = File(...), call_id: str = Form(...), node_id: str = Form(...), talkgroup_id: Optional[int] = Form(None), talkgroup_name: Optional[str] = Form(None), system_id: Optional[str] = Form(None), credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), ): """ Receive an audio recording from an edge node. Upload to GCS, update the call document in Firestore with the audio URL, then kick off the intelligence pipeline as a background task. """ # Verify the per-node API key if not credentials: raise HTTPException(401, "Missing authorization") key_doc = await fstore.doc_get("node_keys", node_id) if not key_doc: logger.warning(f"Upload 401: no key_doc in Firestore for node_id={node_id!r}") raise HTTPException(401, "Invalid node API key") if key_doc.get("api_key") != credentials.credentials: logger.warning( f"Upload 401: key mismatch for node_id={node_id!r} " f"(received prefix: {credentials.credentials[:8]}...)" ) raise HTTPException(401, "Invalid node API key") data = await file.read() if not data: raise HTTPException(400, "Empty file.") filename = file.filename audio_url = await upload_audio(data, filename) if audio_url: try: await fstore.doc_set("calls", call_id, {"audio_url": audio_url}) except Exception as e: logger.warning(f"Could not update call {call_id} with audio_url: {e}") # Convert public GCS URL to gs:// URI for Speech-to-Text gcs_uri = _public_url_to_gcs_uri(audio_url) background_tasks.add_task( _run_intelligence_pipeline, call_id=call_id, node_id=node_id, system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, gcs_uri=gcs_uri, ) return {"url": audio_url} def _public_url_to_gcs_uri(url: str) -> Optional[str]: """ Convert a public GCS URL (possibly signed) like https://storage.googleapis.com/bucket/calls/file.mp3?Expires=... to a gs:// URI usable by Speech-to-Text. Returns None if the URL doesn't look like a GCS URL. """ prefix = "https://storage.googleapis.com/" if url and url.startswith(prefix): path = url[len(prefix):].split("?")[0] # strip signed-URL query params return "gs://" + path return None async def _run_extraction_pipeline( call_id: str, node_id: str, system_id: Optional[str], talkgroup_id: Optional[int], talkgroup_name: Optional[str], transcript: str, segments: Optional[list] = None, preserve_transcript_correction: bool = False, ) -> None: """Run steps 2-4 of the intelligence pipeline using an existing transcript.""" from app.internal import intelligence, incident_correlator, alerter # Step 2: Scene detection + intelligence extraction. # Returns one scene per distinct incident detected in the recording. scenes = await intelligence.extract_scenes( call_id, transcript, talkgroup_name, talkgroup_id=talkgroup_id, system_id=system_id, segments=segments, node_id=node_id, preserve_transcript_correction=preserve_transcript_correction, ) # Step 3: Correlate each scene to an incident independently. incident_ids: list[str] = [] all_tags: list[str] = [] for scene in scenes: all_tags.extend(scene["tags"]) incident_id = await incident_correlator.correlate_call( call_id=call_id, node_id=node_id, system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=scene["tags"], incident_type=scene["incident_type"], location=scene["location"], location_coords=scene["location_coords"], ) if incident_id and incident_id not in incident_ids: incident_ids.append(incident_id) if scene["resolved"] and incident_id: await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") if incident_ids: await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids}) # Step 4: Alert dispatch — run once with merged tags from all scenes. await alerter.check_and_dispatch( call_id=call_id, node_id=node_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=list(dict.fromkeys(all_tags)), transcript=transcript, ) async def _run_intelligence_pipeline( call_id: str, node_id: str, system_id: Optional[str], talkgroup_id: Optional[int], talkgroup_name: Optional[str], gcs_uri: Optional[str], ) -> None: """ Post-upload intelligence pipeline (runs as a background task): 1. Transcribe audio via Google STT 2. Detect scenes + extract intelligence (one result per incident in recording) 3. Correlate each scene with existing incidents (or create new ones) 4. Check alert rules and dispatch notifications """ from app.internal import transcription, intelligence, incident_correlator, alerter transcript: Optional[str] = None segments: list[dict] = [] # Step 1: Transcription if gcs_uri: transcript, segments = await transcription.transcribe_call( call_id, gcs_uri, talkgroup_name, system_id=system_id ) # Step 2: Scene detection + intelligence extraction scenes: list[dict] = [] if transcript: scenes = await intelligence.extract_scenes( call_id, transcript, talkgroup_name, talkgroup_id=talkgroup_id, system_id=system_id, segments=segments, node_id=node_id, ) # Step 3: Correlate each scene independently. # A single recording can produce multiple incidents on a busy channel. incident_ids: list[str] = [] all_tags: list[str] = [] for scene in scenes: all_tags.extend(scene["tags"]) incident_id = await incident_correlator.correlate_call( call_id=call_id, node_id=node_id, system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=scene["tags"], incident_type=scene["incident_type"], location=scene["location"], location_coords=scene["location_coords"], ) if incident_id and incident_id not in incident_ids: incident_ids.append(incident_id) if scene["resolved"] and incident_id: await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") # Correlator also runs for calls with no scenes (unclassified) to attempt # talkgroup-based linking even when no transcript could be produced. if not scenes: incident_id = await incident_correlator.correlate_call( call_id=call_id, node_id=node_id, system_id=system_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=[], incident_type=None, location=None, location_coords=None, ) if incident_id: incident_ids.append(incident_id) if incident_ids: await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids}) # Step 4: Alert dispatch (always runs — talkgroup ID rules don't need a transcript) await alerter.check_and_dispatch( call_id=call_id, node_id=node_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=list(dict.fromkeys(all_tags)), transcript=transcript, )