Files
server-26/drb-c2-core/app/routers/upload.py
Logan 18d96193ab Security fixes
auth.py

secrets.compare_digest replaces == for service key comparison (timing-safe)
Added require_service_key — bot-only endpoints (trip/event join/leave)
Added require_service_key_or_admin — node commands/config (bot via service key OR dashboard admin via Firebase)
Added _RateLimiter with three shared instances: trip_chat_limiter (20/5min per user), summarize_limiter (5/10min per incident), bootstrap_limiter (2/hr per system)
nodes.py

send_command and assign_system now require require_service_key_or_admin — the Discord bot can still call them via service key, but regular Firebase users are blocked
tokens.py

add_token, flush_tokens, set_preferred_system, delete_token all require require_admin_token
Token masking changed from token[:10] + "…" + token[-4:] to "•••" + token[-4:]
systems.py

All write endpoints (create, update, delete, ai-flags, ten-codes, vocabulary writes, bootstrap) now require require_admin_token
bootstrap_vocabulary also calls bootstrap_limiter.check(system_id)
incidents.py

POST /incidents/summarize (bulk) now requires require_admin_token
POST /incidents/{id}/summarize now calls summarize_limiter.check(incident_id)
trips.py

join_trip, leave_trip, join_event, leave_event require require_service_key — only the Discord bot can set Discord attendee identity
delete_trip, delete_event require require_service_key_or_admin
trip_chat rate-limited per caller UID, history stripped to user/assistant roles only, user message truncated to 2000 chars, Maps query strings capped at 200 chars
upload.py

Rejects files larger than settings.upload_max_bytes (default 100MB) with 413
storage.py

_safe_audio_filename() derives GCS object name from call_id + allowlisted extension, completely ignoring the client-supplied filename
config.py

Added upload_max_bytes: int = 100 * 1024 * 1024
Both Dockerfiles — python:3.14-slim → python:3.12-slim
2026-06-21 13:40:08 -04:00

336 lines
13 KiB
Python

from typing import Optional
from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.internal.storage import upload_audio
from app.internal import firestore as fstore
from app.internal.logger import logger
from app.config import settings
router = APIRouter(tags=["upload"])
_bearer = HTTPBearer(auto_error=False)
@router.post("/upload")
async def upload_call_audio(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
call_id: str = Form(...),
node_id: str = Form(...),
talkgroup_id: Optional[int] = Form(None),
talkgroup_name: Optional[str] = Form(None),
system_id: Optional[str] = Form(None),
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
):
"""
Receive an audio recording from an edge node.
Upload to GCS, update the call document in Firestore with the audio URL,
then kick off the intelligence pipeline as a background task.
"""
# Verify the per-node API key
if not credentials:
raise HTTPException(401, "Missing authorization")
key_doc = await fstore.doc_get("node_keys", node_id)
if not key_doc:
logger.warning(f"Upload 401: no key_doc in Firestore for node_id={node_id!r}")
raise HTTPException(401, "Invalid node API key")
if key_doc.get("api_key") != credentials.credentials:
logger.warning(
f"Upload 401: key mismatch for node_id={node_id!r} "
f"(received prefix: {credentials.credentials[:8]}...)"
)
raise HTTPException(401, "Invalid node API key")
data = await file.read()
if not data:
raise HTTPException(400, "Empty file.")
if len(data) > settings.upload_max_bytes:
raise HTTPException(413, f"File too large (max {settings.upload_max_bytes // (1024*1024)} MB).")
audio_url = await upload_audio(data, file.filename or "", call_id=call_id)
if audio_url:
try:
await fstore.doc_set("calls", call_id, {"audio_url": audio_url})
except Exception as e:
logger.warning(f"Could not update call {call_id} with audio_url: {e}")
# Convert public GCS URL to gs:// URI for Speech-to-Text
gcs_uri = _public_url_to_gcs_uri(audio_url)
background_tasks.add_task(
_run_intelligence_pipeline,
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
gcs_uri=gcs_uri,
)
return {"url": audio_url}
def _public_url_to_gcs_uri(url: str) -> Optional[str]:
"""
Convert a public GCS URL (possibly signed) like
https://storage.googleapis.com/bucket/calls/file.mp3?Expires=...
to a gs:// URI usable by Speech-to-Text.
Returns None if the URL doesn't look like a GCS URL.
"""
prefix = "https://storage.googleapis.com/"
if url and url.startswith(prefix):
path = url[len(prefix):].split("?")[0] # strip signed-URL query params
return "gs://" + path
return None
async def _correlate_with_consensus(
call_id: str,
node_id: str,
system_id: Optional[str],
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
tags: list[str],
incident_type: Optional[str],
location: Optional[str],
location_coords: Optional[dict],
units: Optional[list] = None,
vehicles: Optional[list] = None,
cleared_units: Optional[list] = None,
reassignment: bool = False,
) -> Optional[str]:
"""
Consensus correlator: runs the rules engine and the cheap LLM in sequence.
If they agree the rules decision is committed directly.
If they disagree a smarter tiebreaker LLM makes the final call.
Falls back to rules-only when GEMINI_API_KEY is absent, the call is
content-free (thin), or any LLM call fails.
"""
from app.internal import incident_correlator, llm_correlator
preview = await incident_correlator.preview_correlation(
call_id=call_id, node_id=node_id, system_id=system_id,
talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name,
tags=tags, incident_type=incident_type, location=location,
location_coords=location_coords, units=units, vehicles=vehicles,
cleared_units=cleared_units, reassignment=reassignment,
)
ctx = preview["ctx"]
rules_decision = preview["decision"]
llm_decision = await llm_correlator.decide(call_id, ctx)
if llm_decision is None:
# LLM unavailable, skipped (thin call), or errored — rules wins.
rules_decision["corr_debug"]["corr_consensus"] = "rules_only"
return await incident_correlator.apply_correlation(preview)
if llm_correlator.decisions_agree(rules_decision, llm_decision):
rules_decision["corr_debug"]["corr_consensus"] = "agreed"
rules_decision["corr_debug"]["corr_llm_reasoning"] = llm_decision.get("reasoning", "")
return await incident_correlator.apply_correlation(preview)
# Disagree — escalate to the smarter tiebreaker.
logger.info(
f"Consensus disagreement for call {call_id}: "
f"rules={rules_decision['action']} vs llm={llm_decision['action']} — tiebreak"
)
final = await llm_correlator.tiebreak(rules_decision, llm_decision, ctx)
final["corr_debug"]["corr_consensus"] = "tiebreak"
final["corr_debug"]["corr_rules_action"] = rules_decision["action"]
final["corr_debug"]["corr_llm_action"] = llm_decision["action"]
return await incident_correlator.apply_correlation({"decision": final, "ctx": ctx})
async def _run_extraction_pipeline(
call_id: str,
node_id: str,
system_id: Optional[str],
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
transcript: str,
segments: Optional[list] = None,
preserve_transcript_correction: bool = False,
) -> None:
"""Run steps 2-4 of the intelligence pipeline using an existing transcript."""
from app.internal import intelligence, incident_correlator, alerter
# Step 2: Scene detection + intelligence extraction.
# Returns one scene per distinct incident detected in the recording.
scenes = await intelligence.extract_scenes(
call_id, transcript, talkgroup_name,
talkgroup_id=talkgroup_id, system_id=system_id, segments=segments,
node_id=node_id,
preserve_transcript_correction=preserve_transcript_correction,
)
# Step 3: Correlate each scene to an incident independently.
incident_ids: list[str] = []
all_tags: list[str] = []
for scene in scenes:
all_tags.extend(scene["tags"])
# When dispatch is pulling a unit to a NEW call (reassignment), suppress unit
# overlap so the new scene doesn't chain into the unit's previous incident.
is_reassignment = bool(scene.get("reassignment"))
corr_units = [] if is_reassignment else scene.get("units")
incident_id = await _correlate_with_consensus(
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=scene["tags"],
incident_type=scene["incident_type"],
location=scene["location"],
location_coords=scene["location_coords"],
units=corr_units,
vehicles=scene.get("vehicles"),
cleared_units=scene.get("cleared_units"),
reassignment=is_reassignment,
)
if incident_id and incident_id not in incident_ids:
incident_ids.append(incident_id)
if scene["resolved"] and incident_id:
await fstore.doc_set("incidents", incident_id, {"status": "resolved"})
await incident_correlator.maybe_resolve_parent(incident_id)
logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)")
if incident_ids:
await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids})
# Step 4: Alert dispatch — run once with merged tags from all scenes.
await alerter.check_and_dispatch(
call_id=call_id,
node_id=node_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=list(dict.fromkeys(all_tags)),
transcript=transcript,
)
async def _run_intelligence_pipeline(
call_id: str,
node_id: str,
system_id: Optional[str],
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
gcs_uri: Optional[str],
) -> None:
"""
Post-upload intelligence pipeline (runs as a background task):
1. Transcribe audio via Google STT
2. Detect scenes + extract intelligence (one result per incident in recording)
3. Correlate each scene with existing incidents (or create new ones)
4. Check alert rules and dispatch notifications
"""
from app.internal import transcription, intelligence, incident_correlator, alerter
from app.internal.feature_flags import get_flags
flags = await get_flags()
# Resolve per-system overrides: system flag=False beats global flag=True,
# but global flag=False beats everything (master switch).
system_ai_flags: dict = {}
if system_id:
sys_doc = await fstore.doc_get_cached("systems", system_id)
system_ai_flags = (sys_doc or {}).get("ai_flags") or {}
def _flag(name: str) -> bool:
if not flags[name]: # global master off
return False
return system_ai_flags.get(name, True) # system override, default inherit
transcript: Optional[str] = None
segments: list[dict] = []
# Step 1: Transcription
if gcs_uri:
if _flag("stt_enabled"):
transcript, segments = await transcription.transcribe_call(
call_id, gcs_uri, talkgroup_name, system_id=system_id
)
else:
scope = "globally" if not flags["stt_enabled"] else f"system {system_id}"
logger.info(f"STT disabled ({scope}) — skipping transcription for call {call_id}")
# Step 2: Scene detection + intelligence extraction
scenes: list[dict] = []
if _flag("correlation_enabled"):
if transcript:
scenes = await intelligence.extract_scenes(
call_id, transcript, talkgroup_name,
talkgroup_id=talkgroup_id, system_id=system_id, segments=segments,
node_id=node_id,
)
else:
scope = "globally" if not flags["correlation_enabled"] else f"system {system_id}"
logger.info(f"Correlation disabled ({scope}) — skipping scene extraction and correlation for call {call_id}")
# Step 3: Correlate each scene independently.
# A single recording can produce multiple incidents on a busy channel.
incident_ids: list[str] = []
all_tags: list[str] = []
if flags["correlation_enabled"]:
for scene in scenes:
all_tags.extend(scene["tags"])
is_reassignment = bool(scene.get("reassignment"))
corr_units = [] if is_reassignment else scene.get("units")
incident_id = await _correlate_with_consensus(
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=scene["tags"],
incident_type=scene["incident_type"],
location=scene["location"],
location_coords=scene["location_coords"],
units=corr_units,
vehicles=scene.get("vehicles"),
cleared_units=scene.get("cleared_units"),
reassignment=is_reassignment,
)
if incident_id and incident_id not in incident_ids:
incident_ids.append(incident_id)
if scene["resolved"] and incident_id:
await fstore.doc_set("incidents", incident_id, {"status": "resolved"})
await incident_correlator.maybe_resolve_parent(incident_id)
logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)")
# Correlator also runs for calls with no scenes (unclassified) to attempt
# talkgroup-based linking even when no transcript could be produced.
# Skip when extraction flagged the call — garbage or too-short transcripts
# carry no signal and would only attach spuriously via the thin path.
if not scenes:
_call_doc = await fstore.doc_get("calls", call_id)
if not (_call_doc or {}).get("skip_reason"):
incident_id = await _correlate_with_consensus(
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=[],
incident_type=None,
location=None,
location_coords=None,
)
if incident_id:
incident_ids.append(incident_id)
if incident_ids:
await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids})
# Step 4: Alert dispatch (always runs — talkgroup ID rules don't need a transcript)
await alerter.check_and_dispatch(
call_id=call_id,
node_id=node_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=list(dict.fromkeys(all_tags)),
transcript=transcript,
)