from datetime import datetime, timezone, timedelta from fastapi import APIRouter, BackgroundTasks, HTTPException, Query, Depends from pydantic import BaseModel from typing import Optional from app.internal import firestore as fstore from app.internal.auth import require_admin_token class TranscriptUpdate(BaseModel): transcript: str router = APIRouter(prefix="/calls", tags=["calls"]) @router.get("") async def list_calls( node_id: Optional[str] = Query(None), status: Optional[str] = Query(None), system_id: Optional[str] = Query(None), ): filters = {} if node_id: filters["node_id"] = node_id if status: filters["status"] = status if system_id: filters["system_id"] = system_id return await fstore.collection_list("calls", **filters) @router.get("/{call_id}") async def get_call(call_id: str): call = await fstore.doc_get("calls", call_id) if not call: raise HTTPException(404, f"Call '{call_id}' not found.") return call @router.post("/{call_id}/reprocess") async def reprocess_call(call_id: str, background_tasks: BackgroundTasks): """Re-run the full intelligence pipeline (transcription → extraction → correlation) for a call.""" call = await fstore.doc_get("calls", call_id) if not call: raise HTTPException(404, f"Call '{call_id}' not found.") from app.routers.upload import _run_intelligence_pipeline, _public_url_to_gcs_uri audio_url = call.get("audio_url") gcs_uri = _public_url_to_gcs_uri(audio_url) if audio_url else None background_tasks.add_task( _run_intelligence_pipeline, call_id=call_id, node_id=call.get("node_id"), system_id=call.get("system_id"), talkgroup_id=call.get("talkgroup_id"), talkgroup_name=call.get("talkgroup_name"), gcs_uri=gcs_uri, ) return {"ok": True, "call_id": call_id} @router.post("/close-stale") async def close_stale_calls( older_than_minutes: int = Query(30, ge=1, le=1440, description="Close active calls started more than this many minutes ago."), dry_run: bool = Query(False, description="If true, return what would be closed without writing."), _: dict = Depends(require_admin_token), ): """ Find and close calls stuck in 'active' status — e.g. because a node rebooted before sending an end-call event. Returns the list of affected call IDs. """ cutoff = datetime.now(timezone.utc) - timedelta(minutes=older_than_minutes) active_calls = await fstore.collection_list("calls", status="active") stale = [] for call in active_calls: started_raw = call.get("started_at") if not started_raw: continue if isinstance(started_raw, datetime): started = started_raw if started_raw.tzinfo else started_raw.replace(tzinfo=timezone.utc) else: try: started = datetime.fromisoformat(str(started_raw).replace("Z", "+00:00")) except Exception: continue if started < cutoff: stale.append(call) if not dry_run: now_iso = datetime.now(timezone.utc).isoformat() for call in stale: await fstore.doc_set("calls", call["call_id"], { "status": "ended", "ended_at": now_iso, }) return { "dry_run": dry_run, "older_than_minutes": older_than_minutes, "count": len(stale), "call_ids": [c["call_id"] for c in stale], } @router.patch("/{call_id}/transcript") async def patch_transcript( call_id: str, body: TranscriptUpdate, background_tasks: BackgroundTasks, _: dict = Depends(require_admin_token), ): """Overwrite a call's transcript and re-run intelligence extraction.""" call = await fstore.doc_get("calls", call_id) if not call: raise HTTPException(404, f"Call '{call_id}' not found.") # Save user correction as transcript_corrected; leave original transcript intact. # Clear stale intelligence fields so re-extraction runs fresh. await fstore.doc_set("calls", call_id, { "transcript_corrected": body.transcript, "tags": [], "severity": "unknown", "location": None, "units": [], "vehicles": [], "embedding": None, }) # Unlink from ALL current incidents so re-correlation starts clean. # Handles both old single incident_id and new incident_ids list. old_ids: list[str] = call.get("incident_ids") or ( [call["incident_id"]] if call.get("incident_id") else [] ) for old_incident_id in old_ids: old_incident = await fstore.doc_get("incidents", old_incident_id) if old_incident: remaining = [c for c in (old_incident.get("call_ids") or []) if c != call_id] if remaining: await fstore.doc_set("incidents", old_incident_id, { "call_ids": remaining, "summary_stale": True, }) else: await fstore.doc_set("incidents", old_incident_id, { "call_ids": [], "status": "resolved", "summary_stale": True, }) await fstore.doc_set("calls", call_id, {"incident_ids": [], "incident_id": None}) # Learn from the correction: diff original → corrected and add new tokens to vocabulary system_id = call.get("system_id") original_text = call.get("transcript_corrected") or call.get("transcript") or "" if system_id and original_text: from app.internal.vocabulary_learner import learn_from_correction await learn_from_correction(system_id, original_text, body.transcript) from app.routers.upload import _run_extraction_pipeline background_tasks.add_task( _run_extraction_pipeline, call_id=call_id, node_id=call.get("node_id"), system_id=call.get("system_id"), talkgroup_id=call.get("talkgroup_id"), talkgroup_name=call.get("talkgroup_name"), transcript=body.transcript, segments=call.get("segments"), preserve_transcript_correction=True, ) return {"ok": True, "call_id": call_id}