Massive update

This commit is contained in:
Logan
2026-04-11 13:44:08 -04:00
parent fd6c2fd8bf
commit 3b3a136d04
31 changed files with 1919 additions and 94 deletions
+3
View File
@@ -20,6 +20,9 @@ class Settings(BaseSettings):
# Internal service key — allows server-side services (discord bot) to call C2 without Firebase
service_key: Optional[str] = None
# CORS — comma-separated list of allowed origins, or "*" for all
cors_origins: list[str] = ["*"]
class Config:
env_file = ".env"
+124
View File
@@ -0,0 +1,124 @@
"""
Alert dispatch engine.
Loads enabled alert rules from Firestore and checks each one against the call's
talkgroup ID, tags, and transcript. On a match:
1. Creates an AlertEvent document in Firestore.
2. Optionally POSTs a Discord webhook message if the rule has one configured.
Never raises — failures are logged as warnings so the pipeline always completes.
"""
import uuid
from datetime import datetime, timezone
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
async def check_and_dispatch(
call_id: str,
node_id: str,
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
tags: list[str],
transcript: Optional[str],
) -> None:
"""
Check all enabled alert rules and fire events for any that match this call.
"""
try:
rules = await fstore.collection_list("alert_rules", enabled=True)
except Exception as e:
logger.warning(f"Alerter: could not load rules: {e}")
return
for rule in rules:
matched_keywords = _match_rule(rule, talkgroup_id, tags, transcript)
if not matched_keywords:
continue
alert_id = str(uuid.uuid4())
snippet = _snippet(transcript)
now = datetime.now(timezone.utc).isoformat()
event = {
"alert_id": alert_id,
"rule_id": rule.get("rule_id", ""),
"rule_name": rule.get("name", ""),
"call_id": call_id,
"node_id": node_id,
"talkgroup_id": talkgroup_id,
"talkgroup_name": talkgroup_name or "",
"matched_keywords": matched_keywords,
"transcript_snippet": snippet,
"triggered_at": now,
"acknowledged": False,
}
try:
await fstore.doc_set("alert_events", alert_id, event, merge=False)
logger.info(
f"Alert fired: rule='{rule.get('name')}' call={call_id} "
f"keywords={matched_keywords}"
)
except Exception as e:
logger.warning(f"Alerter: could not save alert event: {e}")
continue
webhook_url = rule.get("discord_webhook")
if webhook_url:
await _post_webhook(webhook_url, rule.get("name", ""), talkgroup_name, matched_keywords, snippet)
def _match_rule(
rule: dict,
talkgroup_id: Optional[int],
tags: list[str],
transcript: Optional[str],
) -> list[str]:
"""Return list of matched keywords/reasons, or empty list if no match."""
matched: list[str] = []
# Talkgroup ID match
rule_tg_ids = rule.get("talkgroup_ids", [])
if rule_tg_ids and talkgroup_id is not None and talkgroup_id in rule_tg_ids:
matched.append(f"talkgroup:{talkgroup_id}")
# Keyword match against tags + transcript
rule_keywords = [kw.lower() for kw in rule.get("keywords", [])]
for kw in rule_keywords:
if kw in tags:
matched.append(kw)
elif transcript and kw in transcript.lower():
matched.append(kw)
return matched
def _snippet(transcript: Optional[str], max_len: int = 200) -> Optional[str]:
if not transcript:
return None
return transcript[:max_len] + ("" if len(transcript) > max_len else "")
async def _post_webhook(
url: str,
rule_name: str,
talkgroup_name: Optional[str],
matched_keywords: list[str],
snippet: Optional[str],
) -> None:
try:
import httpx
tg_label = talkgroup_name or "Unknown"
kw_str = ", ".join(matched_keywords)
body = (
f"**Alert: {rule_name}**\n"
f"Talkgroup: {tg_label}\n"
f"Matched: {kw_str}"
)
if snippet:
body += f"\n> {snippet}"
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(url, json={"content": body})
except Exception as e:
logger.warning(f"Alerter: Discord webhook POST failed: {e}")
@@ -0,0 +1,109 @@
"""
Incident correlation engine.
After a call is transcribed and tagged, this module attempts to link it to an
existing open incident (same type, same node/system, within a 30-minute
window). If no match is found, a new incident is auto-created.
The result is written back to Firestore on both the call document
(call.incident_id) and the incident document (incident.call_ids).
"""
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
_CORRELATION_WINDOW = timedelta(minutes=30)
async def correlate_call(
call_id: str,
node_id: str,
system_id: Optional[str],
talkgroup_name: Optional[str],
tags: list[str],
incident_type: Optional[str],
) -> Optional[str]:
"""
Link call_id to an existing incident or create a new one.
Args:
call_id: ID of the call being processed.
node_id: Edge node that recorded the call.
system_id: Radio system ID (may be None).
talkgroup_name: Human-readable talkgroup name for auto-title generation.
tags: Tags extracted by intelligence.py.
incident_type: Primary incident category (fire/police/ems/accident) or None.
Returns:
The incident_id that was linked, or None if skipped.
"""
if not incident_type:
return None
now = datetime.now(timezone.utc)
cutoff = (now - _CORRELATION_WINDOW).isoformat()
# Fetch active incidents of the same type
candidates = await fstore.collection_list("incidents", status="active", type=incident_type)
# Filter to incidents updated within the correlation window and on this node
matched_incident: Optional[dict] = None
for inc in candidates:
updated_raw = inc.get("updated_at", "")
try:
updated_dt = datetime.fromisoformat(str(updated_raw).replace("Z", "+00:00"))
if updated_dt.tzinfo is None:
updated_dt = updated_dt.replace(tzinfo=timezone.utc)
except Exception:
continue
if updated_dt < (now - _CORRELATION_WINDOW):
continue
# Check whether any call in this incident came from the same node
linked_call_ids = inc.get("call_ids", [])
if linked_call_ids:
for linked_id in linked_call_ids[:5]: # check last 5 calls to avoid slow queries
linked_call = await fstore.doc_get("calls", linked_id)
if linked_call and linked_call.get("node_id") == node_id:
matched_incident = inc
break
if matched_incident:
break
if matched_incident:
incident_id = matched_incident["incident_id"]
existing_ids = matched_incident.get("call_ids", [])
if call_id not in existing_ids:
existing_ids.append(call_id)
await fstore.doc_update("incidents", incident_id, {
"call_ids": existing_ids,
"updated_at": now.isoformat(),
})
logger.info(f"Correlator: linked call {call_id} to existing incident {incident_id}")
else:
# Create a new incident
incident_id = str(uuid.uuid4())
tg_label = talkgroup_name or "Unknown Talkgroup"
title = f"Auto: {incident_type.title()}{tg_label}"
doc = {
"incident_id": incident_id,
"title": title,
"type": incident_type,
"status": "active",
"location": None,
"call_ids": [call_id],
"summary": None,
"tags": tags,
"started_at": now.isoformat(),
"updated_at": now.isoformat(),
}
await fstore.doc_set("incidents", incident_id, doc, merge=False)
logger.info(f"Correlator: created new incident {incident_id} for call {call_id} ({incident_type})")
# Back-link the call
await fstore.doc_update("calls", call_id, {"incident_id": incident_id})
return incident_id
+106
View File
@@ -0,0 +1,106 @@
"""
Rules-based intelligence extraction from call transcripts.
Scans a transcript for known incident keywords, categorises the call, and
extracts rough location hints (street/intersection mentions).
No external ML dependencies — fast and always available even when STT is
disabled. Designed to run as part of the post-upload background pipeline.
"""
import re
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
# ---------------------------------------------------------------------------
# Keyword taxonomy
# ---------------------------------------------------------------------------
INCIDENT_KEYWORDS: dict[str, list[str]] = {
"fire": [
"fire", "smoke", "flames", "burning", "structure fire", "brush fire",
"wildfire", "arson", "working fire", "fully involved",
],
"ems": [
"cardiac", "unconscious", "breathing", "overdose", "trauma",
"injury", "ambulance", "ems", "medic", "chest pain", "stroke",
"unresponsive", "fall", "laceration",
],
"police": [
"pursuit", "chase", "shots fired", "weapon", "suspect", "robbery",
"assault", "burglary", "stolen", "fleeing", "armed", "shooting",
"stabbing", "domestic",
],
"accident": [
"accident", "collision", "crash", "mvr", "vehicle", "rollover",
"hit and run", "ped", "pedestrian", "pi", "property damage",
],
}
# Street suffix patterns for location extraction
_STREET_RE = re.compile(
r'\b(?:\d+\s+)?[A-Z][a-zA-Z]+(?: [A-Z][a-zA-Z]+)*'
r'\s+(?:Street|St|Avenue|Ave|Boulevard|Blvd|Drive|Dr|Road|Rd|Lane|Ln'
r'|Court|Ct|Place|Pl|Way|Circle|Cir|Highway|Hwy|Route|Rt)\b',
re.IGNORECASE,
)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def extract_tags(
call_id: str,
transcript: str,
) -> tuple[list[str], Optional[str]]:
"""
Extract incident tags from a transcript.
Returns:
(tags, primary_type) — e.g. (["fire", "structure fire"], "fire")
primary_type is the category with the most keyword hits, or None.
Side-effect: updates calls/{call_id}.tags in Firestore.
"""
lower = transcript.lower()
matched: dict[str, list[str]] = {}
for category, keywords in INCIDENT_KEYWORDS.items():
hits = [kw for kw in keywords if kw in lower]
if hits:
matched[category] = hits
tags: list[str] = []
for category, hits in matched.items():
tags.append(category)
tags.extend(h for h in hits if h != category)
# Deduplicate while preserving order
seen: set[str] = set()
unique_tags: list[str] = []
for t in tags:
if t not in seen:
seen.add(t)
unique_tags.append(t)
# Primary type = category with most keyword hits
primary_type: Optional[str] = None
if matched:
primary_type = max(matched, key=lambda c: len(matched[c]))
if unique_tags:
try:
await fstore.doc_update("calls", call_id, {"tags": unique_tags})
except Exception as e:
logger.warning(f"Could not save tags for call {call_id}: {e}")
logger.info(f"Intelligence: call {call_id} → tags={unique_tags}, type={primary_type}")
return unique_tags, primary_type
def extract_location_hint(transcript: str) -> Optional[str]:
"""Return the first street-level location mention found in the transcript, or None."""
match = _STREET_RE.search(transcript)
return match.group(0) if match else None
+19 -3
View File
@@ -30,9 +30,10 @@ class MQTTHandler:
def _on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code == 0:
self._connected = True
client.subscribe("nodes/+/checkin", qos=1)
client.subscribe("nodes/+/status", qos=1)
client.subscribe("nodes/+/metadata", qos=1)
client.subscribe("nodes/+/checkin", qos=1)
client.subscribe("nodes/+/status", qos=1)
client.subscribe("nodes/+/metadata", qos=1)
client.subscribe("nodes/+/key_request", qos=1)
logger.info("MQTT connected — subscribed to node topics.")
else:
logger.error(f"MQTT connect refused: {reason_code}")
@@ -68,6 +69,8 @@ class MQTTHandler:
await self._handle_status(node_id, payload)
elif msg_type == "metadata":
await self._handle_metadata(node_id, payload)
elif msg_type == "key_request":
await self._handle_key_request(node_id)
except Exception as e:
logger.error(f"MQTT dispatch error [{msg_type}] from {node_id}: {e}")
@@ -188,6 +191,19 @@ class MQTTHandler:
await fstore.doc_update("calls", call_id, updates)
logger.info(f"Call end: {call_id}")
# ------------------------------------------------------------------
# Key request — re-deliver an existing approved key to a node that
# lost its credentials (e.g. after a directory move / fresh volume)
# ------------------------------------------------------------------
async def _handle_key_request(self, node_id: str):
key_doc = await fstore.doc_get("node_keys", node_id)
if not key_doc or not key_doc.get("api_key"):
logger.warning(f"Key request from {node_id} but no key found in Firestore — node may not be approved yet.")
return
self.publish_node_key(node_id, key_doc["api_key"])
logger.info(f"Re-delivered API key to {node_id} on request.")
# ------------------------------------------------------------------
# Outbound — send a command to a specific node
# ------------------------------------------------------------------
+70
View File
@@ -0,0 +1,70 @@
"""
Speech-to-text transcription for recorded calls.
Uses Google Cloud Speech-to-Text v1 (authenticated via the same ADC / service
account used by firebase-admin and google-cloud-storage).
Triggered as a background task from the upload endpoint after a call audio
file has been successfully stored in GCS.
"""
import asyncio
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
async def transcribe_call(call_id: str, gcs_uri: str) -> Optional[str]:
"""
Transcribe audio at the given GCS URI and store the result in Firestore.
Args:
call_id: Firestore document ID in the 'calls' collection.
gcs_uri: GCS URI of the audio file, e.g. gs://bucket/calls/xyz.mp3
Returns:
The transcript string, or None if transcription failed / was skipped.
"""
if not gcs_uri or not gcs_uri.startswith("gs://"):
return None
try:
transcript = await asyncio.to_thread(_sync_transcribe, gcs_uri)
except Exception as e:
logger.warning(f"Transcription failed for call {call_id}: {e}")
return None
if transcript:
try:
await fstore.doc_update("calls", call_id, {"transcript": transcript})
logger.info(f"Transcript saved for call {call_id} ({len(transcript)} chars)")
except Exception as e:
logger.warning(f"Could not save transcript for {call_id}: {e}")
return transcript
def _sync_transcribe(gcs_uri: str) -> Optional[str]:
"""Synchronous STT call — run in a thread via asyncio.to_thread."""
from google.cloud import speech
client = speech.SpeechClient()
audio = speech.RecognitionAudio(uri=gcs_uri)
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.MP3,
sample_rate_hertz=22050,
language_code="en-US",
enable_automatic_punctuation=True,
model="latest_long",
)
# Use long_running_recognize for reliability; it handles both short and long audio
operation = client.long_running_recognize(config=config, audio=audio)
response = operation.result(timeout=120)
parts = [
result.alternatives[0].transcript
for result in response.results
if result.alternatives
]
return " ".join(parts).strip() or None
+11 -7
View File
@@ -5,8 +5,9 @@ from fastapi.middleware.cors import CORSMiddleware
from app.internal.logger import logger
from app.internal.mqtt_handler import mqtt_handler
from app.internal.node_sweeper import sweeper_loop
from app.config import settings
from app.internal.auth import require_firebase_token, require_service_or_firebase_token
from app.routers import nodes, systems, calls, upload, tokens
from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts
@asynccontextmanager
@@ -27,16 +28,19 @@ app = FastAPI(title="DRB C2 Core", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_origins=settings.cors_origins,
allow_methods=["*"],
allow_headers=["*"],
allow_credentials=True,
)
app.include_router(nodes.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(systems.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(calls.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(tokens.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(upload.router) # auth is per-node, handled inline
app.include_router(nodes.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(systems.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(calls.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(tokens.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(incidents.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(alerts.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(upload.router) # auth is per-node, handled inline
@app.get("/health")
+54
View File
@@ -78,3 +78,57 @@ class IncidentRecord(BaseModel):
updated_at: datetime
summary: Optional[str] = None
tags: List[str] = []
class IncidentCreate(BaseModel):
title: str
type: str = "other"
status: str = "active"
location: Optional[Dict[str, float]] = None
call_ids: List[str] = []
summary: Optional[str] = None
tags: List[str] = []
class IncidentUpdate(BaseModel):
title: Optional[str] = None
type: Optional[str] = None
status: Optional[str] = None
location: Optional[Dict[str, float]] = None
summary: Optional[str] = None
tags: Optional[List[str]] = None
# ---------------------------------------------------------------------------
# Alerts
# ---------------------------------------------------------------------------
class AlertRule(BaseModel):
rule_id: Optional[str] = None
name: str
keywords: List[str] = []
talkgroup_ids: List[int] = []
enabled: bool = True
discord_webhook: Optional[str] = None # POST here when rule fires
class AlertRuleUpdate(BaseModel):
name: Optional[str] = None
keywords: Optional[List[str]] = None
talkgroup_ids: Optional[List[int]] = None
enabled: Optional[bool] = None
discord_webhook: Optional[str] = None
class AlertEvent(BaseModel):
alert_id: Optional[str] = None
rule_id: str
rule_name: str
call_id: str
node_id: str
talkgroup_id: Optional[int] = None
talkgroup_name: Optional[str] = None
matched_keywords: List[str] = []
transcript_snippet: Optional[str] = None
triggered_at: Optional[datetime] = None
acknowledged: bool = False
+74
View File
@@ -0,0 +1,74 @@
import uuid
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends
from app.models import AlertRule, AlertRuleUpdate
from app.internal import firestore as fstore
from app.internal.auth import require_admin_token
router = APIRouter(tags=["alerts"])
# ---------------------------------------------------------------------------
# Alert events (triggered alerts)
# ---------------------------------------------------------------------------
@router.get("/alerts")
async def list_alerts(acknowledged: Optional[bool] = None):
filters = {}
if acknowledged is not None:
filters["acknowledged"] = acknowledged
return await fstore.collection_list("alert_events", **filters)
@router.post("/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: str):
doc = await fstore.doc_get("alert_events", alert_id)
if not doc:
raise HTTPException(404, f"Alert '{alert_id}' not found.")
await fstore.doc_update("alert_events", alert_id, {"acknowledged": True})
return {"ok": True}
# ---------------------------------------------------------------------------
# Alert rules
# ---------------------------------------------------------------------------
@router.get("/alert-rules")
async def list_alert_rules():
return await fstore.collection_list("alert_rules")
@router.post("/alert-rules")
async def create_alert_rule(body: AlertRule, _: dict = Depends(require_admin_token)):
rule_id = str(uuid.uuid4())
doc = {
"rule_id": rule_id,
"name": body.name,
"keywords": body.keywords,
"talkgroup_ids": body.talkgroup_ids,
"enabled": body.enabled,
"discord_webhook": body.discord_webhook,
"created_at": datetime.now(timezone.utc).isoformat(),
}
await fstore.doc_set("alert_rules", rule_id, doc, merge=False)
return doc
@router.put("/alert-rules/{rule_id}")
async def update_alert_rule(rule_id: str, body: AlertRuleUpdate, _: dict = Depends(require_admin_token)):
doc = await fstore.doc_get("alert_rules", rule_id)
if not doc:
raise HTTPException(404, f"Alert rule '{rule_id}' not found.")
updates = body.model_dump(exclude_none=True)
await fstore.doc_update("alert_rules", rule_id, updates)
return {**doc, **updates}
@router.delete("/alert-rules/{rule_id}")
async def delete_alert_rule(rule_id: str, _: dict = Depends(require_admin_token)):
doc = await fstore.doc_get("alert_rules", rule_id)
if not doc:
raise HTTPException(404, f"Alert rule '{rule_id}' not found.")
await fstore.doc_delete("alert_rules", rule_id)
return {"ok": True}
+83
View File
@@ -0,0 +1,83 @@
import uuid
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends
from app.models import IncidentCreate, IncidentUpdate
from app.internal import firestore as fstore
from app.internal.auth import require_admin_token
router = APIRouter(prefix="/incidents", tags=["incidents"])
@router.get("")
async def list_incidents(status: Optional[str] = None, type: Optional[str] = None):
filters = {}
if status:
filters["status"] = status
if type:
filters["type"] = type
return await fstore.collection_list("incidents", **filters)
@router.get("/{incident_id}")
async def get_incident(incident_id: str):
doc = await fstore.doc_get("incidents", incident_id)
if not doc:
raise HTTPException(404, f"Incident '{incident_id}' not found.")
return doc
@router.post("")
async def create_incident(body: IncidentCreate, _: dict = Depends(require_admin_token)):
now = datetime.now(timezone.utc).isoformat()
incident_id = str(uuid.uuid4())
doc = {
"incident_id": incident_id,
"title": body.title,
"type": body.type,
"status": body.status,
"location": body.location,
"call_ids": body.call_ids,
"summary": body.summary,
"tags": body.tags,
"started_at": now,
"updated_at": now,
}
await fstore.doc_set("incidents", incident_id, doc, merge=False)
return doc
@router.put("/{incident_id}")
async def update_incident(incident_id: str, body: IncidentUpdate, _: dict = Depends(require_admin_token)):
doc = await fstore.doc_get("incidents", incident_id)
if not doc:
raise HTTPException(404, f"Incident '{incident_id}' not found.")
updates = body.model_dump(exclude_none=True)
updates["updated_at"] = datetime.now(timezone.utc).isoformat()
await fstore.doc_update("incidents", incident_id, updates)
return {**doc, **updates}
@router.delete("/{incident_id}")
async def delete_incident(incident_id: str, _: dict = Depends(require_admin_token)):
doc = await fstore.doc_get("incidents", incident_id)
if not doc:
raise HTTPException(404, f"Incident '{incident_id}' not found.")
await fstore.doc_delete("incidents", incident_id)
return {"ok": True}
@router.post("/{incident_id}/calls/{call_id}")
async def link_call_to_incident(incident_id: str, call_id: str, _: dict = Depends(require_admin_token)):
doc = await fstore.doc_get("incidents", incident_id)
if not doc:
raise HTTPException(404, f"Incident '{incident_id}' not found.")
call_ids = doc.get("call_ids", [])
if call_id not in call_ids:
call_ids.append(call_id)
await fstore.doc_update("incidents", incident_id, {
"call_ids": call_ids,
"updated_at": datetime.now(timezone.utc).isoformat(),
})
await fstore.doc_update("calls", call_id, {"incident_id": incident_id})
return {"ok": True}
+13
View File
@@ -66,6 +66,19 @@ async def send_command(node_id: str, cmd: CommandPayload):
return {"ok": True}
@router.post("/{node_id}/reissue-key")
async def reissue_node_key(node_id: str, _: dict = Depends(require_admin_token)):
"""Generate a new API key for the node and push it via MQTT (retained).
Use this to rotate a key or recover a node whose key was lost."""
node = await fstore.doc_get("nodes", node_id)
if not node:
raise HTTPException(404, f"Node '{node_id}' not found.")
api_key = secrets.token_hex(32)
await fstore.doc_set("node_keys", node_id, {"node_id": node_id, "api_key": api_key}, merge=False)
mqtt_handler.publish_node_key(node_id, api_key)
return {"ok": True}
@router.post("/{node_id}/config/{system_id}")
async def assign_system(node_id: str, system_id: str):
"""
+92 -3
View File
@@ -1,5 +1,5 @@
from typing import Optional
from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Security
from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.internal.storage import upload_audio
from app.internal import firestore as fstore
@@ -12,20 +12,32 @@ _bearer = HTTPBearer(auto_error=False)
@router.post("/upload")
async def upload_call_audio(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
call_id: str = Form(...),
node_id: str = Form(...),
talkgroup_id: Optional[int] = Form(None),
talkgroup_name: Optional[str] = Form(None),
system_id: Optional[str] = Form(None),
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
):
"""
Receive an audio recording from an edge node.
Upload to GCS, update the call document in Firestore with the audio URL.
Upload to GCS, update the call document in Firestore with the audio URL,
then kick off the intelligence pipeline as a background task.
"""
# Verify the per-node API key
if not credentials:
raise HTTPException(401, "Missing authorization")
key_doc = await fstore.doc_get("node_keys", node_id)
if not key_doc or key_doc.get("api_key") != credentials.credentials:
if not key_doc:
logger.warning(f"Upload 401: no key_doc in Firestore for node_id={node_id!r}")
raise HTTPException(401, "Invalid node API key")
if key_doc.get("api_key") != credentials.credentials:
logger.warning(
f"Upload 401: key mismatch for node_id={node_id!r} "
f"(received prefix: {credentials.credentials[:8]}...)"
)
raise HTTPException(401, "Invalid node API key")
data = await file.read()
@@ -41,4 +53,81 @@ async def upload_call_audio(
except Exception as e:
logger.warning(f"Could not update call {call_id} with audio_url: {e}")
# Convert public GCS URL to gs:// URI for Speech-to-Text
gcs_uri = _public_url_to_gcs_uri(audio_url)
background_tasks.add_task(
_run_intelligence_pipeline,
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
gcs_uri=gcs_uri,
)
return {"url": audio_url}
def _public_url_to_gcs_uri(url: str) -> Optional[str]:
"""
Convert a public GCS URL like
https://storage.googleapis.com/bucket/calls/file.mp3
to a gs:// URI usable by Speech-to-Text.
Returns None if the URL doesn't look like a GCS URL.
"""
prefix = "https://storage.googleapis.com/"
if url and url.startswith(prefix):
return "gs://" + url[len(prefix):]
return None
async def _run_intelligence_pipeline(
call_id: str,
node_id: str,
system_id: Optional[str],
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
gcs_uri: Optional[str],
) -> None:
"""
Post-upload intelligence pipeline (runs as a background task):
1. Transcribe audio via Google STT
2. Extract tags/incident type from transcript
3. Correlate with existing incidents (or create new one)
4. Check alert rules and dispatch notifications
"""
from app.internal import transcription, intelligence, incident_correlator, alerter
transcript: Optional[str] = None
# Step 1: Transcription
if gcs_uri:
transcript = await transcription.transcribe_call(call_id, gcs_uri)
# Step 2: Intelligence extraction
tags: list[str] = []
incident_type: Optional[str] = None
if transcript:
tags, incident_type = await intelligence.extract_tags(call_id, transcript)
# Step 3: Incident correlation
if incident_type:
await incident_correlator.correlate_call(
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_name=talkgroup_name,
tags=tags,
incident_type=incident_type,
)
# Step 4: Alert dispatch (always runs — talkgroup ID rules don't need a transcript)
await alerter.check_and_dispatch(
call_id=call_id,
node_id=node_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=tags,
transcript=transcript,
)