173 lines
6.1 KiB
Python
173 lines
6.1 KiB
Python
from datetime import datetime, timezone, timedelta
|
|
from fastapi import APIRouter, BackgroundTasks, HTTPException, Query, Depends
|
|
from pydantic import BaseModel
|
|
from typing import Optional
|
|
from app.internal import firestore as fstore
|
|
from app.internal.auth import require_admin_token
|
|
|
|
|
|
class TranscriptUpdate(BaseModel):
|
|
transcript: str
|
|
|
|
router = APIRouter(prefix="/calls", tags=["calls"])
|
|
|
|
|
|
@router.get("")
|
|
async def list_calls(
|
|
node_id: Optional[str] = Query(None),
|
|
status: Optional[str] = Query(None),
|
|
system_id: Optional[str] = Query(None),
|
|
):
|
|
filters = {}
|
|
if node_id:
|
|
filters["node_id"] = node_id
|
|
if status:
|
|
filters["status"] = status
|
|
if system_id:
|
|
filters["system_id"] = system_id
|
|
return await fstore.collection_list("calls", **filters)
|
|
|
|
|
|
@router.get("/{call_id}")
|
|
async def get_call(call_id: str):
|
|
call = await fstore.doc_get("calls", call_id)
|
|
if not call:
|
|
raise HTTPException(404, f"Call '{call_id}' not found.")
|
|
return call
|
|
|
|
|
|
@router.post("/{call_id}/reprocess")
|
|
async def reprocess_call(call_id: str, background_tasks: BackgroundTasks):
|
|
"""Re-run the full intelligence pipeline (transcription → extraction → correlation) for a call."""
|
|
call = await fstore.doc_get("calls", call_id)
|
|
if not call:
|
|
raise HTTPException(404, f"Call '{call_id}' not found.")
|
|
|
|
from app.routers.upload import _run_intelligence_pipeline, _public_url_to_gcs_uri
|
|
|
|
audio_url = call.get("audio_url")
|
|
gcs_uri = _public_url_to_gcs_uri(audio_url) if audio_url else None
|
|
|
|
background_tasks.add_task(
|
|
_run_intelligence_pipeline,
|
|
call_id=call_id,
|
|
node_id=call.get("node_id"),
|
|
system_id=call.get("system_id"),
|
|
talkgroup_id=call.get("talkgroup_id"),
|
|
talkgroup_name=call.get("talkgroup_name"),
|
|
gcs_uri=gcs_uri,
|
|
)
|
|
return {"ok": True, "call_id": call_id}
|
|
|
|
|
|
@router.post("/close-stale")
|
|
async def close_stale_calls(
|
|
older_than_minutes: int = Query(30, ge=1, le=1440, description="Close active calls started more than this many minutes ago."),
|
|
dry_run: bool = Query(False, description="If true, return what would be closed without writing."),
|
|
_: dict = Depends(require_admin_token),
|
|
):
|
|
"""
|
|
Find and close calls stuck in 'active' status — e.g. because a node rebooted
|
|
before sending an end-call event. Returns the list of affected call IDs.
|
|
"""
|
|
cutoff = datetime.now(timezone.utc) - timedelta(minutes=older_than_minutes)
|
|
active_calls = await fstore.collection_list("calls", status="active")
|
|
|
|
stale = []
|
|
for call in active_calls:
|
|
started_raw = call.get("started_at")
|
|
if not started_raw:
|
|
continue
|
|
if isinstance(started_raw, datetime):
|
|
started = started_raw if started_raw.tzinfo else started_raw.replace(tzinfo=timezone.utc)
|
|
else:
|
|
try:
|
|
started = datetime.fromisoformat(str(started_raw).replace("Z", "+00:00"))
|
|
except Exception:
|
|
continue
|
|
if started < cutoff:
|
|
stale.append(call)
|
|
|
|
if not dry_run:
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
for call in stale:
|
|
await fstore.doc_set("calls", call["call_id"], {
|
|
"status": "ended",
|
|
"ended_at": now_iso,
|
|
})
|
|
|
|
return {
|
|
"dry_run": dry_run,
|
|
"older_than_minutes": older_than_minutes,
|
|
"count": len(stale),
|
|
"call_ids": [c["call_id"] for c in stale],
|
|
}
|
|
|
|
|
|
@router.patch("/{call_id}/transcript")
|
|
async def patch_transcript(
|
|
call_id: str,
|
|
body: TranscriptUpdate,
|
|
background_tasks: BackgroundTasks,
|
|
_: dict = Depends(require_admin_token),
|
|
):
|
|
"""Overwrite a call's transcript and re-run intelligence extraction."""
|
|
call = await fstore.doc_get("calls", call_id)
|
|
if not call:
|
|
raise HTTPException(404, f"Call '{call_id}' not found.")
|
|
|
|
# Save user correction as transcript_corrected; leave original transcript intact.
|
|
# Clear stale intelligence fields so re-extraction runs fresh.
|
|
await fstore.doc_set("calls", call_id, {
|
|
"transcript_corrected": body.transcript,
|
|
"tags": [],
|
|
"severity": "unknown",
|
|
"location": None,
|
|
"units": [],
|
|
"vehicles": [],
|
|
"embedding": None,
|
|
})
|
|
|
|
# Unlink from ALL current incidents so re-correlation starts clean.
|
|
# Handles both old single incident_id and new incident_ids list.
|
|
old_ids: list[str] = call.get("incident_ids") or (
|
|
[call["incident_id"]] if call.get("incident_id") else []
|
|
)
|
|
for old_incident_id in old_ids:
|
|
old_incident = await fstore.doc_get("incidents", old_incident_id)
|
|
if old_incident:
|
|
remaining = [c for c in (old_incident.get("call_ids") or []) if c != call_id]
|
|
if remaining:
|
|
await fstore.doc_set("incidents", old_incident_id, {
|
|
"call_ids": remaining,
|
|
"summary_stale": True,
|
|
})
|
|
else:
|
|
await fstore.doc_set("incidents", old_incident_id, {
|
|
"call_ids": [],
|
|
"status": "resolved",
|
|
"summary_stale": True,
|
|
})
|
|
await fstore.doc_set("calls", call_id, {"incident_ids": [], "incident_id": None})
|
|
|
|
# Learn from the correction: diff original → corrected and add new tokens to vocabulary
|
|
system_id = call.get("system_id")
|
|
original_text = call.get("transcript_corrected") or call.get("transcript") or ""
|
|
if system_id and original_text:
|
|
from app.internal.vocabulary_learner import learn_from_correction
|
|
await learn_from_correction(system_id, original_text, body.transcript)
|
|
|
|
from app.routers.upload import _run_extraction_pipeline
|
|
background_tasks.add_task(
|
|
_run_extraction_pipeline,
|
|
call_id=call_id,
|
|
node_id=call.get("node_id"),
|
|
system_id=call.get("system_id"),
|
|
talkgroup_id=call.get("talkgroup_id"),
|
|
talkgroup_name=call.get("talkgroup_name"),
|
|
transcript=body.transcript,
|
|
segments=call.get("segments"),
|
|
preserve_transcript_correction=True,
|
|
)
|
|
return {"ok": True, "call_id": call_id}
|