From 3b3a136d0475b7f883d51c0cc52daeb085612f9b Mon Sep 17 00:00:00 2001 From: Logan Date: Sat, 11 Apr 2026 13:44:08 -0400 Subject: [PATCH] Massive update --- .env.example | 17 ++ docker-compose.yml | 14 +- drb-c2-core/.env.example | 6 +- drb-c2-core/app/config.py | 3 + drb-c2-core/app/internal/alerter.py | 124 ++++++++ .../app/internal/incident_correlator.py | 109 +++++++ drb-c2-core/app/internal/intelligence.py | 106 +++++++ drb-c2-core/app/internal/mqtt_handler.py | 22 +- drb-c2-core/app/internal/transcription.py | 70 +++++ drb-c2-core/app/main.py | 18 +- drb-c2-core/app/models.py | 54 ++++ drb-c2-core/app/routers/alerts.py | 74 +++++ drb-c2-core/app/routers/incidents.py | 83 +++++ drb-c2-core/app/routers/nodes.py | 13 + drb-c2-core/app/routers/upload.py | 95 +++++- drb-c2-core/mosquitto/acl.conf | 19 ++ drb-c2-core/mosquitto/entrypoint.sh | 32 ++ drb-c2-core/mosquitto/mosquitto.conf | 11 +- drb-c2-core/requirements.txt | 2 + drb-frontend/app/alerts/page.tsx | 289 ++++++++++++++++++ drb-frontend/app/calls/page.tsx | 1 + drb-frontend/app/incidents/page.tsx | 285 +++++++++++++++++ drb-frontend/app/login/page.tsx | 96 ++++-- drb-frontend/app/map/page.tsx | 8 +- drb-frontend/components/CallRow.tsx | 127 ++++++-- drb-frontend/components/MapView.tsx | 60 +++- drb-frontend/components/Nav.tsx | 27 +- drb-frontend/lib/c2api.ts | 34 +++ drb-frontend/lib/types.ts | 24 ++ drb-frontend/lib/useAlerts.ts | 95 ++++++ drb-frontend/lib/useIncidents.ts | 95 ++++++ 31 files changed, 1919 insertions(+), 94 deletions(-) create mode 100644 .env.example create mode 100644 drb-c2-core/app/internal/alerter.py create mode 100644 drb-c2-core/app/internal/incident_correlator.py create mode 100644 drb-c2-core/app/internal/intelligence.py create mode 100644 drb-c2-core/app/internal/transcription.py create mode 100644 drb-c2-core/app/routers/alerts.py create mode 100644 drb-c2-core/app/routers/incidents.py create mode 100644 drb-c2-core/mosquitto/acl.conf create mode 100644 drb-c2-core/mosquitto/entrypoint.sh create mode 100644 drb-frontend/app/alerts/page.tsx create mode 100644 drb-frontend/app/incidents/page.tsx create mode 100644 drb-frontend/lib/useAlerts.ts create mode 100644 drb-frontend/lib/useIncidents.ts diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..40f8613 --- /dev/null +++ b/.env.example @@ -0,0 +1,17 @@ +# Top-level docker-compose environment variables +# Copy to .env and fill in values before running `docker compose up` + +# ----------------------------------------------------------------------- +# MQTT broker credentials +# These are injected into the mosquitto container at startup to build the +# password file. Use different values in production — do NOT reuse defaults. +# ----------------------------------------------------------------------- + +# C2-core service account (full broker access) +MQTT_C2_USER=drb-c2-core +MQTT_C2_PASS=change-me-c2 + +# Shared credential for all edge nodes (ACL scopes each node to its own +# nodes//# namespace via the MQTT client ID) +MQTT_NODE_USER=drb-node +MQTT_NODE_PASS=change-me-node diff --git a/docker-compose.yml b/docker-compose.yml index 6e8c654..f1f3c48 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,8 +4,17 @@ services: restart: unless-stopped ports: - "1883:1883" + entrypoint: ["/mosquitto/config/entrypoint.sh"] + environment: + - MQTT_C2_USER=${MQTT_C2_USER} + - MQTT_C2_PASS=${MQTT_C2_PASS} + - MQTT_NODE_USER=${MQTT_NODE_USER} + - MQTT_NODE_PASS=${MQTT_NODE_PASS} volumes: - - ./drb-c2-core/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf + - ./drb-c2-core/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro + - ./drb-c2-core/mosquitto/acl.conf:/mosquitto/config/acl.conf:ro + - ./drb-c2-core/mosquitto/entrypoint.sh:/mosquitto/config/entrypoint.sh:ro + - mosquitto_data:/mosquitto/data c2-core: build: ./drb-c2-core @@ -33,3 +42,6 @@ services: env_file: ./drb-frontend/.env depends_on: - c2-core + +volumes: + mosquitto_data: diff --git a/drb-c2-core/.env.example b/drb-c2-core/.env.example index 12b86da..e950e00 100644 --- a/drb-c2-core/.env.example +++ b/drb-c2-core/.env.example @@ -1,8 +1,10 @@ # MQTT broker (usually the mosquitto container on this host) MQTT_BROKER=mosquitto MQTT_PORT=1883 -MQTT_USER= -MQTT_PASS= +# Use the c2-core credential — must match MQTT_C2_USER/MQTT_C2_PASS in the +# top-level .env (which is passed to the mosquitto entrypoint) +MQTT_USER=drb-c2-core +MQTT_PASS=change-me-c2 # GCP — path to service account JSON inside the container GCP_CREDENTIALS_PATH=/app/gcp-key.json diff --git a/drb-c2-core/app/config.py b/drb-c2-core/app/config.py index 740e373..0502e4f 100644 --- a/drb-c2-core/app/config.py +++ b/drb-c2-core/app/config.py @@ -20,6 +20,9 @@ class Settings(BaseSettings): # Internal service key — allows server-side services (discord bot) to call C2 without Firebase service_key: Optional[str] = None + # CORS — comma-separated list of allowed origins, or "*" for all + cors_origins: list[str] = ["*"] + class Config: env_file = ".env" diff --git a/drb-c2-core/app/internal/alerter.py b/drb-c2-core/app/internal/alerter.py new file mode 100644 index 0000000..ce447dc --- /dev/null +++ b/drb-c2-core/app/internal/alerter.py @@ -0,0 +1,124 @@ +""" +Alert dispatch engine. + +Loads enabled alert rules from Firestore and checks each one against the call's +talkgroup ID, tags, and transcript. On a match: + 1. Creates an AlertEvent document in Firestore. + 2. Optionally POSTs a Discord webhook message if the rule has one configured. + +Never raises — failures are logged as warnings so the pipeline always completes. +""" +import uuid +from datetime import datetime, timezone +from typing import Optional +from app.internal.logger import logger +from app.internal import firestore as fstore + + +async def check_and_dispatch( + call_id: str, + node_id: str, + talkgroup_id: Optional[int], + talkgroup_name: Optional[str], + tags: list[str], + transcript: Optional[str], +) -> None: + """ + Check all enabled alert rules and fire events for any that match this call. + """ + try: + rules = await fstore.collection_list("alert_rules", enabled=True) + except Exception as e: + logger.warning(f"Alerter: could not load rules: {e}") + return + + for rule in rules: + matched_keywords = _match_rule(rule, talkgroup_id, tags, transcript) + if not matched_keywords: + continue + + alert_id = str(uuid.uuid4()) + snippet = _snippet(transcript) + now = datetime.now(timezone.utc).isoformat() + event = { + "alert_id": alert_id, + "rule_id": rule.get("rule_id", ""), + "rule_name": rule.get("name", ""), + "call_id": call_id, + "node_id": node_id, + "talkgroup_id": talkgroup_id, + "talkgroup_name": talkgroup_name or "", + "matched_keywords": matched_keywords, + "transcript_snippet": snippet, + "triggered_at": now, + "acknowledged": False, + } + + try: + await fstore.doc_set("alert_events", alert_id, event, merge=False) + logger.info( + f"Alert fired: rule='{rule.get('name')}' call={call_id} " + f"keywords={matched_keywords}" + ) + except Exception as e: + logger.warning(f"Alerter: could not save alert event: {e}") + continue + + webhook_url = rule.get("discord_webhook") + if webhook_url: + await _post_webhook(webhook_url, rule.get("name", ""), talkgroup_name, matched_keywords, snippet) + + +def _match_rule( + rule: dict, + talkgroup_id: Optional[int], + tags: list[str], + transcript: Optional[str], +) -> list[str]: + """Return list of matched keywords/reasons, or empty list if no match.""" + matched: list[str] = [] + + # Talkgroup ID match + rule_tg_ids = rule.get("talkgroup_ids", []) + if rule_tg_ids and talkgroup_id is not None and talkgroup_id in rule_tg_ids: + matched.append(f"talkgroup:{talkgroup_id}") + + # Keyword match against tags + transcript + rule_keywords = [kw.lower() for kw in rule.get("keywords", [])] + for kw in rule_keywords: + if kw in tags: + matched.append(kw) + elif transcript and kw in transcript.lower(): + matched.append(kw) + + return matched + + +def _snippet(transcript: Optional[str], max_len: int = 200) -> Optional[str]: + if not transcript: + return None + return transcript[:max_len] + ("…" if len(transcript) > max_len else "") + + +async def _post_webhook( + url: str, + rule_name: str, + talkgroup_name: Optional[str], + matched_keywords: list[str], + snippet: Optional[str], +) -> None: + try: + import httpx + tg_label = talkgroup_name or "Unknown" + kw_str = ", ".join(matched_keywords) + body = ( + f"**Alert: {rule_name}**\n" + f"Talkgroup: {tg_label}\n" + f"Matched: {kw_str}" + ) + if snippet: + body += f"\n> {snippet}" + async with httpx.AsyncClient(timeout=5.0) as client: + await client.post(url, json={"content": body}) + except Exception as e: + logger.warning(f"Alerter: Discord webhook POST failed: {e}") diff --git a/drb-c2-core/app/internal/incident_correlator.py b/drb-c2-core/app/internal/incident_correlator.py new file mode 100644 index 0000000..4748fa6 --- /dev/null +++ b/drb-c2-core/app/internal/incident_correlator.py @@ -0,0 +1,109 @@ +""" +Incident correlation engine. + +After a call is transcribed and tagged, this module attempts to link it to an +existing open incident (same type, same node/system, within a 30-minute +window). If no match is found, a new incident is auto-created. + +The result is written back to Firestore on both the call document +(call.incident_id) and the incident document (incident.call_ids). +""" +import uuid +from datetime import datetime, timezone, timedelta +from typing import Optional +from app.internal.logger import logger +from app.internal import firestore as fstore + + +_CORRELATION_WINDOW = timedelta(minutes=30) + + +async def correlate_call( + call_id: str, + node_id: str, + system_id: Optional[str], + talkgroup_name: Optional[str], + tags: list[str], + incident_type: Optional[str], +) -> Optional[str]: + """ + Link call_id to an existing incident or create a new one. + + Args: + call_id: ID of the call being processed. + node_id: Edge node that recorded the call. + system_id: Radio system ID (may be None). + talkgroup_name: Human-readable talkgroup name for auto-title generation. + tags: Tags extracted by intelligence.py. + incident_type: Primary incident category (fire/police/ems/accident) or None. + + Returns: + The incident_id that was linked, or None if skipped. + """ + if not incident_type: + return None + + now = datetime.now(timezone.utc) + cutoff = (now - _CORRELATION_WINDOW).isoformat() + + # Fetch active incidents of the same type + candidates = await fstore.collection_list("incidents", status="active", type=incident_type) + + # Filter to incidents updated within the correlation window and on this node + matched_incident: Optional[dict] = None + for inc in candidates: + updated_raw = inc.get("updated_at", "") + try: + updated_dt = datetime.fromisoformat(str(updated_raw).replace("Z", "+00:00")) + if updated_dt.tzinfo is None: + updated_dt = updated_dt.replace(tzinfo=timezone.utc) + except Exception: + continue + + if updated_dt < (now - _CORRELATION_WINDOW): + continue + + # Check whether any call in this incident came from the same node + linked_call_ids = inc.get("call_ids", []) + if linked_call_ids: + for linked_id in linked_call_ids[:5]: # check last 5 calls to avoid slow queries + linked_call = await fstore.doc_get("calls", linked_id) + if linked_call and linked_call.get("node_id") == node_id: + matched_incident = inc + break + if matched_incident: + break + + if matched_incident: + incident_id = matched_incident["incident_id"] + existing_ids = matched_incident.get("call_ids", []) + if call_id not in existing_ids: + existing_ids.append(call_id) + await fstore.doc_update("incidents", incident_id, { + "call_ids": existing_ids, + "updated_at": now.isoformat(), + }) + logger.info(f"Correlator: linked call {call_id} to existing incident {incident_id}") + else: + # Create a new incident + incident_id = str(uuid.uuid4()) + tg_label = talkgroup_name or "Unknown Talkgroup" + title = f"Auto: {incident_type.title()} — {tg_label}" + doc = { + "incident_id": incident_id, + "title": title, + "type": incident_type, + "status": "active", + "location": None, + "call_ids": [call_id], + "summary": None, + "tags": tags, + "started_at": now.isoformat(), + "updated_at": now.isoformat(), + } + await fstore.doc_set("incidents", incident_id, doc, merge=False) + logger.info(f"Correlator: created new incident {incident_id} for call {call_id} ({incident_type})") + + # Back-link the call + await fstore.doc_update("calls", call_id, {"incident_id": incident_id}) + return incident_id diff --git a/drb-c2-core/app/internal/intelligence.py b/drb-c2-core/app/internal/intelligence.py new file mode 100644 index 0000000..1510e3b --- /dev/null +++ b/drb-c2-core/app/internal/intelligence.py @@ -0,0 +1,106 @@ +""" +Rules-based intelligence extraction from call transcripts. + +Scans a transcript for known incident keywords, categorises the call, and +extracts rough location hints (street/intersection mentions). + +No external ML dependencies — fast and always available even when STT is +disabled. Designed to run as part of the post-upload background pipeline. +""" +import re +from typing import Optional +from app.internal.logger import logger +from app.internal import firestore as fstore + + +# --------------------------------------------------------------------------- +# Keyword taxonomy +# --------------------------------------------------------------------------- + +INCIDENT_KEYWORDS: dict[str, list[str]] = { + "fire": [ + "fire", "smoke", "flames", "burning", "structure fire", "brush fire", + "wildfire", "arson", "working fire", "fully involved", + ], + "ems": [ + "cardiac", "unconscious", "breathing", "overdose", "trauma", + "injury", "ambulance", "ems", "medic", "chest pain", "stroke", + "unresponsive", "fall", "laceration", + ], + "police": [ + "pursuit", "chase", "shots fired", "weapon", "suspect", "robbery", + "assault", "burglary", "stolen", "fleeing", "armed", "shooting", + "stabbing", "domestic", + ], + "accident": [ + "accident", "collision", "crash", "mvr", "vehicle", "rollover", + "hit and run", "ped", "pedestrian", "pi", "property damage", + ], +} + +# Street suffix patterns for location extraction +_STREET_RE = re.compile( + r'\b(?:\d+\s+)?[A-Z][a-zA-Z]+(?: [A-Z][a-zA-Z]+)*' + r'\s+(?:Street|St|Avenue|Ave|Boulevard|Blvd|Drive|Dr|Road|Rd|Lane|Ln' + r'|Court|Ct|Place|Pl|Way|Circle|Cir|Highway|Hwy|Route|Rt)\b', + re.IGNORECASE, +) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +async def extract_tags( + call_id: str, + transcript: str, +) -> tuple[list[str], Optional[str]]: + """ + Extract incident tags from a transcript. + + Returns: + (tags, primary_type) — e.g. (["fire", "structure fire"], "fire") + primary_type is the category with the most keyword hits, or None. + + Side-effect: updates calls/{call_id}.tags in Firestore. + """ + lower = transcript.lower() + matched: dict[str, list[str]] = {} + + for category, keywords in INCIDENT_KEYWORDS.items(): + hits = [kw for kw in keywords if kw in lower] + if hits: + matched[category] = hits + + tags: list[str] = [] + for category, hits in matched.items(): + tags.append(category) + tags.extend(h for h in hits if h != category) + + # Deduplicate while preserving order + seen: set[str] = set() + unique_tags: list[str] = [] + for t in tags: + if t not in seen: + seen.add(t) + unique_tags.append(t) + + # Primary type = category with most keyword hits + primary_type: Optional[str] = None + if matched: + primary_type = max(matched, key=lambda c: len(matched[c])) + + if unique_tags: + try: + await fstore.doc_update("calls", call_id, {"tags": unique_tags}) + except Exception as e: + logger.warning(f"Could not save tags for call {call_id}: {e}") + + logger.info(f"Intelligence: call {call_id} → tags={unique_tags}, type={primary_type}") + return unique_tags, primary_type + + +def extract_location_hint(transcript: str) -> Optional[str]: + """Return the first street-level location mention found in the transcript, or None.""" + match = _STREET_RE.search(transcript) + return match.group(0) if match else None diff --git a/drb-c2-core/app/internal/mqtt_handler.py b/drb-c2-core/app/internal/mqtt_handler.py index 8a60110..f1c1a2e 100644 --- a/drb-c2-core/app/internal/mqtt_handler.py +++ b/drb-c2-core/app/internal/mqtt_handler.py @@ -30,9 +30,10 @@ class MQTTHandler: def _on_connect(self, client, userdata, flags, reason_code, properties): if reason_code == 0: self._connected = True - client.subscribe("nodes/+/checkin", qos=1) - client.subscribe("nodes/+/status", qos=1) - client.subscribe("nodes/+/metadata", qos=1) + client.subscribe("nodes/+/checkin", qos=1) + client.subscribe("nodes/+/status", qos=1) + client.subscribe("nodes/+/metadata", qos=1) + client.subscribe("nodes/+/key_request", qos=1) logger.info("MQTT connected — subscribed to node topics.") else: logger.error(f"MQTT connect refused: {reason_code}") @@ -68,6 +69,8 @@ class MQTTHandler: await self._handle_status(node_id, payload) elif msg_type == "metadata": await self._handle_metadata(node_id, payload) + elif msg_type == "key_request": + await self._handle_key_request(node_id) except Exception as e: logger.error(f"MQTT dispatch error [{msg_type}] from {node_id}: {e}") @@ -188,6 +191,19 @@ class MQTTHandler: await fstore.doc_update("calls", call_id, updates) logger.info(f"Call end: {call_id}") + # ------------------------------------------------------------------ + # Key request — re-deliver an existing approved key to a node that + # lost its credentials (e.g. after a directory move / fresh volume) + # ------------------------------------------------------------------ + + async def _handle_key_request(self, node_id: str): + key_doc = await fstore.doc_get("node_keys", node_id) + if not key_doc or not key_doc.get("api_key"): + logger.warning(f"Key request from {node_id} but no key found in Firestore — node may not be approved yet.") + return + self.publish_node_key(node_id, key_doc["api_key"]) + logger.info(f"Re-delivered API key to {node_id} on request.") + # ------------------------------------------------------------------ # Outbound — send a command to a specific node # ------------------------------------------------------------------ diff --git a/drb-c2-core/app/internal/transcription.py b/drb-c2-core/app/internal/transcription.py new file mode 100644 index 0000000..7a898a5 --- /dev/null +++ b/drb-c2-core/app/internal/transcription.py @@ -0,0 +1,70 @@ +""" +Speech-to-text transcription for recorded calls. + +Uses Google Cloud Speech-to-Text v1 (authenticated via the same ADC / service +account used by firebase-admin and google-cloud-storage). + +Triggered as a background task from the upload endpoint after a call audio +file has been successfully stored in GCS. +""" +import asyncio +from typing import Optional +from app.internal.logger import logger +from app.internal import firestore as fstore + + +async def transcribe_call(call_id: str, gcs_uri: str) -> Optional[str]: + """ + Transcribe audio at the given GCS URI and store the result in Firestore. + + Args: + call_id: Firestore document ID in the 'calls' collection. + gcs_uri: GCS URI of the audio file, e.g. gs://bucket/calls/xyz.mp3 + + Returns: + The transcript string, or None if transcription failed / was skipped. + """ + if not gcs_uri or not gcs_uri.startswith("gs://"): + return None + + try: + transcript = await asyncio.to_thread(_sync_transcribe, gcs_uri) + except Exception as e: + logger.warning(f"Transcription failed for call {call_id}: {e}") + return None + + if transcript: + try: + await fstore.doc_update("calls", call_id, {"transcript": transcript}) + logger.info(f"Transcript saved for call {call_id} ({len(transcript)} chars)") + except Exception as e: + logger.warning(f"Could not save transcript for {call_id}: {e}") + + return transcript + + +def _sync_transcribe(gcs_uri: str) -> Optional[str]: + """Synchronous STT call — run in a thread via asyncio.to_thread.""" + from google.cloud import speech + + client = speech.SpeechClient() + + audio = speech.RecognitionAudio(uri=gcs_uri) + config = speech.RecognitionConfig( + encoding=speech.RecognitionConfig.AudioEncoding.MP3, + sample_rate_hertz=22050, + language_code="en-US", + enable_automatic_punctuation=True, + model="latest_long", + ) + + # Use long_running_recognize for reliability; it handles both short and long audio + operation = client.long_running_recognize(config=config, audio=audio) + response = operation.result(timeout=120) + + parts = [ + result.alternatives[0].transcript + for result in response.results + if result.alternatives + ] + return " ".join(parts).strip() or None diff --git a/drb-c2-core/app/main.py b/drb-c2-core/app/main.py index b2a4734..ff5403f 100644 --- a/drb-c2-core/app/main.py +++ b/drb-c2-core/app/main.py @@ -5,8 +5,9 @@ from fastapi.middleware.cors import CORSMiddleware from app.internal.logger import logger from app.internal.mqtt_handler import mqtt_handler from app.internal.node_sweeper import sweeper_loop +from app.config import settings from app.internal.auth import require_firebase_token, require_service_or_firebase_token -from app.routers import nodes, systems, calls, upload, tokens +from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts @asynccontextmanager @@ -27,16 +28,19 @@ app = FastAPI(title="DRB C2 Core", lifespan=lifespan) app.add_middleware( CORSMiddleware, - allow_origins=["*"], + allow_origins=settings.cors_origins, allow_methods=["*"], allow_headers=["*"], + allow_credentials=True, ) -app.include_router(nodes.router, dependencies=[Depends(require_service_or_firebase_token)]) -app.include_router(systems.router, dependencies=[Depends(require_service_or_firebase_token)]) -app.include_router(calls.router, dependencies=[Depends(require_service_or_firebase_token)]) -app.include_router(tokens.router, dependencies=[Depends(require_service_or_firebase_token)]) -app.include_router(upload.router) # auth is per-node, handled inline +app.include_router(nodes.router, dependencies=[Depends(require_service_or_firebase_token)]) +app.include_router(systems.router, dependencies=[Depends(require_service_or_firebase_token)]) +app.include_router(calls.router, dependencies=[Depends(require_service_or_firebase_token)]) +app.include_router(tokens.router, dependencies=[Depends(require_service_or_firebase_token)]) +app.include_router(incidents.router, dependencies=[Depends(require_service_or_firebase_token)]) +app.include_router(alerts.router, dependencies=[Depends(require_service_or_firebase_token)]) +app.include_router(upload.router) # auth is per-node, handled inline @app.get("/health") diff --git a/drb-c2-core/app/models.py b/drb-c2-core/app/models.py index 651195f..b2b72e8 100644 --- a/drb-c2-core/app/models.py +++ b/drb-c2-core/app/models.py @@ -78,3 +78,57 @@ class IncidentRecord(BaseModel): updated_at: datetime summary: Optional[str] = None tags: List[str] = [] + + +class IncidentCreate(BaseModel): + title: str + type: str = "other" + status: str = "active" + location: Optional[Dict[str, float]] = None + call_ids: List[str] = [] + summary: Optional[str] = None + tags: List[str] = [] + + +class IncidentUpdate(BaseModel): + title: Optional[str] = None + type: Optional[str] = None + status: Optional[str] = None + location: Optional[Dict[str, float]] = None + summary: Optional[str] = None + tags: Optional[List[str]] = None + + +# --------------------------------------------------------------------------- +# Alerts +# --------------------------------------------------------------------------- + +class AlertRule(BaseModel): + rule_id: Optional[str] = None + name: str + keywords: List[str] = [] + talkgroup_ids: List[int] = [] + enabled: bool = True + discord_webhook: Optional[str] = None # POST here when rule fires + + +class AlertRuleUpdate(BaseModel): + name: Optional[str] = None + keywords: Optional[List[str]] = None + talkgroup_ids: Optional[List[int]] = None + enabled: Optional[bool] = None + discord_webhook: Optional[str] = None + + +class AlertEvent(BaseModel): + alert_id: Optional[str] = None + rule_id: str + rule_name: str + call_id: str + node_id: str + talkgroup_id: Optional[int] = None + talkgroup_name: Optional[str] = None + matched_keywords: List[str] = [] + transcript_snippet: Optional[str] = None + triggered_at: Optional[datetime] = None + acknowledged: bool = False diff --git a/drb-c2-core/app/routers/alerts.py b/drb-c2-core/app/routers/alerts.py new file mode 100644 index 0000000..cff6275 --- /dev/null +++ b/drb-c2-core/app/routers/alerts.py @@ -0,0 +1,74 @@ +import uuid +from datetime import datetime, timezone +from typing import Optional +from fastapi import APIRouter, HTTPException, Depends +from app.models import AlertRule, AlertRuleUpdate +from app.internal import firestore as fstore +from app.internal.auth import require_admin_token + +router = APIRouter(tags=["alerts"]) + + +# --------------------------------------------------------------------------- +# Alert events (triggered alerts) +# --------------------------------------------------------------------------- + +@router.get("/alerts") +async def list_alerts(acknowledged: Optional[bool] = None): + filters = {} + if acknowledged is not None: + filters["acknowledged"] = acknowledged + return await fstore.collection_list("alert_events", **filters) + + +@router.post("/alerts/{alert_id}/acknowledge") +async def acknowledge_alert(alert_id: str): + doc = await fstore.doc_get("alert_events", alert_id) + if not doc: + raise HTTPException(404, f"Alert '{alert_id}' not found.") + await fstore.doc_update("alert_events", alert_id, {"acknowledged": True}) + return {"ok": True} + + +# --------------------------------------------------------------------------- +# Alert rules +# --------------------------------------------------------------------------- + +@router.get("/alert-rules") +async def list_alert_rules(): + return await fstore.collection_list("alert_rules") + + +@router.post("/alert-rules") +async def create_alert_rule(body: AlertRule, _: dict = Depends(require_admin_token)): + rule_id = str(uuid.uuid4()) + doc = { + "rule_id": rule_id, + "name": body.name, + "keywords": body.keywords, + "talkgroup_ids": body.talkgroup_ids, + "enabled": body.enabled, + "discord_webhook": body.discord_webhook, + "created_at": datetime.now(timezone.utc).isoformat(), + } + await fstore.doc_set("alert_rules", rule_id, doc, merge=False) + return doc + + +@router.put("/alert-rules/{rule_id}") +async def update_alert_rule(rule_id: str, body: AlertRuleUpdate, _: dict = Depends(require_admin_token)): + doc = await fstore.doc_get("alert_rules", rule_id) + if not doc: + raise HTTPException(404, f"Alert rule '{rule_id}' not found.") + updates = body.model_dump(exclude_none=True) + await fstore.doc_update("alert_rules", rule_id, updates) + return {**doc, **updates} + + +@router.delete("/alert-rules/{rule_id}") +async def delete_alert_rule(rule_id: str, _: dict = Depends(require_admin_token)): + doc = await fstore.doc_get("alert_rules", rule_id) + if not doc: + raise HTTPException(404, f"Alert rule '{rule_id}' not found.") + await fstore.doc_delete("alert_rules", rule_id) + return {"ok": True} diff --git a/drb-c2-core/app/routers/incidents.py b/drb-c2-core/app/routers/incidents.py new file mode 100644 index 0000000..9e6db43 --- /dev/null +++ b/drb-c2-core/app/routers/incidents.py @@ -0,0 +1,83 @@ +import uuid +from datetime import datetime, timezone +from typing import Optional +from fastapi import APIRouter, HTTPException, Depends +from app.models import IncidentCreate, IncidentUpdate +from app.internal import firestore as fstore +from app.internal.auth import require_admin_token + +router = APIRouter(prefix="/incidents", tags=["incidents"]) + + +@router.get("") +async def list_incidents(status: Optional[str] = None, type: Optional[str] = None): + filters = {} + if status: + filters["status"] = status + if type: + filters["type"] = type + return await fstore.collection_list("incidents", **filters) + + +@router.get("/{incident_id}") +async def get_incident(incident_id: str): + doc = await fstore.doc_get("incidents", incident_id) + if not doc: + raise HTTPException(404, f"Incident '{incident_id}' not found.") + return doc + + +@router.post("") +async def create_incident(body: IncidentCreate, _: dict = Depends(require_admin_token)): + now = datetime.now(timezone.utc).isoformat() + incident_id = str(uuid.uuid4()) + doc = { + "incident_id": incident_id, + "title": body.title, + "type": body.type, + "status": body.status, + "location": body.location, + "call_ids": body.call_ids, + "summary": body.summary, + "tags": body.tags, + "started_at": now, + "updated_at": now, + } + await fstore.doc_set("incidents", incident_id, doc, merge=False) + return doc + + +@router.put("/{incident_id}") +async def update_incident(incident_id: str, body: IncidentUpdate, _: dict = Depends(require_admin_token)): + doc = await fstore.doc_get("incidents", incident_id) + if not doc: + raise HTTPException(404, f"Incident '{incident_id}' not found.") + updates = body.model_dump(exclude_none=True) + updates["updated_at"] = datetime.now(timezone.utc).isoformat() + await fstore.doc_update("incidents", incident_id, updates) + return {**doc, **updates} + + +@router.delete("/{incident_id}") +async def delete_incident(incident_id: str, _: dict = Depends(require_admin_token)): + doc = await fstore.doc_get("incidents", incident_id) + if not doc: + raise HTTPException(404, f"Incident '{incident_id}' not found.") + await fstore.doc_delete("incidents", incident_id) + return {"ok": True} + + +@router.post("/{incident_id}/calls/{call_id}") +async def link_call_to_incident(incident_id: str, call_id: str, _: dict = Depends(require_admin_token)): + doc = await fstore.doc_get("incidents", incident_id) + if not doc: + raise HTTPException(404, f"Incident '{incident_id}' not found.") + call_ids = doc.get("call_ids", []) + if call_id not in call_ids: + call_ids.append(call_id) + await fstore.doc_update("incidents", incident_id, { + "call_ids": call_ids, + "updated_at": datetime.now(timezone.utc).isoformat(), + }) + await fstore.doc_update("calls", call_id, {"incident_id": incident_id}) + return {"ok": True} diff --git a/drb-c2-core/app/routers/nodes.py b/drb-c2-core/app/routers/nodes.py index b484cbf..cd9d617 100644 --- a/drb-c2-core/app/routers/nodes.py +++ b/drb-c2-core/app/routers/nodes.py @@ -66,6 +66,19 @@ async def send_command(node_id: str, cmd: CommandPayload): return {"ok": True} +@router.post("/{node_id}/reissue-key") +async def reissue_node_key(node_id: str, _: dict = Depends(require_admin_token)): + """Generate a new API key for the node and push it via MQTT (retained). + Use this to rotate a key or recover a node whose key was lost.""" + node = await fstore.doc_get("nodes", node_id) + if not node: + raise HTTPException(404, f"Node '{node_id}' not found.") + api_key = secrets.token_hex(32) + await fstore.doc_set("node_keys", node_id, {"node_id": node_id, "api_key": api_key}, merge=False) + mqtt_handler.publish_node_key(node_id, api_key) + return {"ok": True} + + @router.post("/{node_id}/config/{system_id}") async def assign_system(node_id: str, system_id: str): """ diff --git a/drb-c2-core/app/routers/upload.py b/drb-c2-core/app/routers/upload.py index 84eacb7..34009c4 100644 --- a/drb-c2-core/app/routers/upload.py +++ b/drb-c2-core/app/routers/upload.py @@ -1,5 +1,5 @@ from typing import Optional -from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Security +from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from app.internal.storage import upload_audio from app.internal import firestore as fstore @@ -12,20 +12,32 @@ _bearer = HTTPBearer(auto_error=False) @router.post("/upload") async def upload_call_audio( + background_tasks: BackgroundTasks, file: UploadFile = File(...), call_id: str = Form(...), node_id: str = Form(...), + talkgroup_id: Optional[int] = Form(None), + talkgroup_name: Optional[str] = Form(None), + system_id: Optional[str] = Form(None), credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), ): """ Receive an audio recording from an edge node. - Upload to GCS, update the call document in Firestore with the audio URL. + Upload to GCS, update the call document in Firestore with the audio URL, + then kick off the intelligence pipeline as a background task. """ # Verify the per-node API key if not credentials: raise HTTPException(401, "Missing authorization") key_doc = await fstore.doc_get("node_keys", node_id) - if not key_doc or key_doc.get("api_key") != credentials.credentials: + if not key_doc: + logger.warning(f"Upload 401: no key_doc in Firestore for node_id={node_id!r}") + raise HTTPException(401, "Invalid node API key") + if key_doc.get("api_key") != credentials.credentials: + logger.warning( + f"Upload 401: key mismatch for node_id={node_id!r} " + f"(received prefix: {credentials.credentials[:8]}...)" + ) raise HTTPException(401, "Invalid node API key") data = await file.read() @@ -41,4 +53,81 @@ async def upload_call_audio( except Exception as e: logger.warning(f"Could not update call {call_id} with audio_url: {e}") + # Convert public GCS URL to gs:// URI for Speech-to-Text + gcs_uri = _public_url_to_gcs_uri(audio_url) + + background_tasks.add_task( + _run_intelligence_pipeline, + call_id=call_id, + node_id=node_id, + system_id=system_id, + talkgroup_id=talkgroup_id, + talkgroup_name=talkgroup_name, + gcs_uri=gcs_uri, + ) + return {"url": audio_url} + + +def _public_url_to_gcs_uri(url: str) -> Optional[str]: + """ + Convert a public GCS URL like + https://storage.googleapis.com/bucket/calls/file.mp3 + to a gs:// URI usable by Speech-to-Text. + Returns None if the URL doesn't look like a GCS URL. + """ + prefix = "https://storage.googleapis.com/" + if url and url.startswith(prefix): + return "gs://" + url[len(prefix):] + return None + + +async def _run_intelligence_pipeline( + call_id: str, + node_id: str, + system_id: Optional[str], + talkgroup_id: Optional[int], + talkgroup_name: Optional[str], + gcs_uri: Optional[str], +) -> None: + """ + Post-upload intelligence pipeline (runs as a background task): + 1. Transcribe audio via Google STT + 2. Extract tags/incident type from transcript + 3. Correlate with existing incidents (or create new one) + 4. Check alert rules and dispatch notifications + """ + from app.internal import transcription, intelligence, incident_correlator, alerter + + transcript: Optional[str] = None + + # Step 1: Transcription + if gcs_uri: + transcript = await transcription.transcribe_call(call_id, gcs_uri) + + # Step 2: Intelligence extraction + tags: list[str] = [] + incident_type: Optional[str] = None + if transcript: + tags, incident_type = await intelligence.extract_tags(call_id, transcript) + + # Step 3: Incident correlation + if incident_type: + await incident_correlator.correlate_call( + call_id=call_id, + node_id=node_id, + system_id=system_id, + talkgroup_name=talkgroup_name, + tags=tags, + incident_type=incident_type, + ) + + # Step 4: Alert dispatch (always runs — talkgroup ID rules don't need a transcript) + await alerter.check_and_dispatch( + call_id=call_id, + node_id=node_id, + talkgroup_id=talkgroup_id, + talkgroup_name=talkgroup_name, + tags=tags, + transcript=transcript, + ) diff --git a/drb-c2-core/mosquitto/acl.conf b/drb-c2-core/mosquitto/acl.conf new file mode 100644 index 0000000..c92cb27 --- /dev/null +++ b/drb-c2-core/mosquitto/acl.conf @@ -0,0 +1,19 @@ +# ----------------------------------------------------------------------- +# Mosquitto ACL — DRB C2 Server +# ----------------------------------------------------------------------- +# Two principals: +# drb-c2-core — the backend service; needs full broker access +# drb-node — shared credential for all edge nodes; scoped to their +# own namespace via MQTT client ID (%c = NODE_ID) +# ----------------------------------------------------------------------- + +# C2-core service — full read/write on every topic +user drb-c2-core +topic readwrite # + +# Edge nodes — each node may only read/write topics under nodes// +# Mosquitto substitutes %c with the connecting client's MQTT client ID at +# runtime. Edge nodes set client_id = NODE_ID in mqtt_manager.py, so this +# cryptographically prevents node-A from publishing to nodes/node-B/api_key +# or any other node's namespace. +pattern readwrite nodes/%c/# diff --git a/drb-c2-core/mosquitto/entrypoint.sh b/drb-c2-core/mosquitto/entrypoint.sh new file mode 100644 index 0000000..2551622 --- /dev/null +++ b/drb-c2-core/mosquitto/entrypoint.sh @@ -0,0 +1,32 @@ +#!/bin/sh +# Mosquitto entrypoint — generates /mosquitto/config/passwd from env vars +# before handing off to the broker process. +# +# Required environment variables (set in docker-compose.yml): +# MQTT_C2_USER — username for the drb-c2-core service +# MQTT_C2_PASS — password for the drb-c2-core service +# MQTT_NODE_USER — shared username for all edge nodes +# MQTT_NODE_PASS — shared password for all edge nodes +set -e + +PASSWD_FILE=/mosquitto/config/passwd + +# Remove any stale file so we start clean on every container start +rm -f "$PASSWD_FILE" + +if [ -z "$MQTT_C2_USER" ] || [ -z "$MQTT_C2_PASS" ]; then + echo "ERROR: MQTT_C2_USER and MQTT_C2_PASS must be set" >&2 + exit 1 +fi + +if [ -z "$MQTT_NODE_USER" ] || [ -z "$MQTT_NODE_PASS" ]; then + echo "ERROR: MQTT_NODE_USER and MQTT_NODE_PASS must be set" >&2 + exit 1 +fi + +mosquitto_passwd -b "$PASSWD_FILE" "$MQTT_C2_USER" "$MQTT_C2_PASS" +mosquitto_passwd -b "$PASSWD_FILE" "$MQTT_NODE_USER" "$MQTT_NODE_PASS" + +echo "Mosquitto: password file written for users: $MQTT_C2_USER, $MQTT_NODE_USER" + +exec /usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf diff --git a/drb-c2-core/mosquitto/mosquitto.conf b/drb-c2-core/mosquitto/mosquitto.conf index b5d4eb0..3624d93 100644 --- a/drb-c2-core/mosquitto/mosquitto.conf +++ b/drb-c2-core/mosquitto/mosquitto.conf @@ -1,8 +1,15 @@ listener 1883 -allow_anonymous true +allow_anonymous false -# Persist messages across restarts +# Credentials and ACLs are generated/mounted at container startup +password_file /mosquitto/config/passwd +acl_file /mosquitto/config/acl.conf + +# Persist retained messages (e.g. api_key, node status) across broker restarts persistence true persistence_location /mosquitto/data/ log_dest stdout +log_type error +log_type warning +log_type notice diff --git a/drb-c2-core/requirements.txt b/drb-c2-core/requirements.txt index 7d33a3c..160b080 100644 --- a/drb-c2-core/requirements.txt +++ b/drb-c2-core/requirements.txt @@ -4,6 +4,8 @@ pydantic-settings paho-mqtt>=2.0.0 firebase-admin google-cloud-storage +google-cloud-speech +httpx python-multipart pytest pytest-asyncio diff --git a/drb-frontend/app/alerts/page.tsx b/drb-frontend/app/alerts/page.tsx new file mode 100644 index 0000000..c3159cb --- /dev/null +++ b/drb-frontend/app/alerts/page.tsx @@ -0,0 +1,289 @@ +"use client"; + +import { useState } from "react"; +import { useAuth } from "@/components/AuthProvider"; +import { useAlerts } from "@/lib/useAlerts"; +import { c2api } from "@/lib/c2api"; +import type { AlertRule } from "@/lib/types"; + +function fmtTime(iso: string) { + try { return new Date(iso).toLocaleString(); } catch { return iso; } +} + +function RulesTab({ isAdmin }: { isAdmin: boolean }) { + const [rules, setRules] = useState([]); + const [loaded, setLoaded] = useState(false); + const [name, setName] = useState(""); + const [keywords, setKeywords] = useState(""); + const [tgIds, setTgIds] = useState(""); + const [webhook, setWebhook] = useState(""); + const [saving, setSaving] = useState(false); + const [error, setError] = useState(null); + + async function load() { + if (loaded) return; + try { + const data = await c2api.getAlertRules() as AlertRule[]; + setRules(data); + setLoaded(true); + } catch (e) { + console.error(e); + } + } + + // Load on first render of this tab + if (!loaded) { load(); } + + async function handleCreate(e: React.FormEvent) { + e.preventDefault(); + setSaving(true); + setError(null); + try { + const body = { + name, + keywords: keywords.split(",").map((k) => k.trim()).filter(Boolean), + talkgroup_ids: tgIds.split(",").map((s) => parseInt(s.trim(), 10)).filter((n) => !isNaN(n)), + enabled: true, + discord_webhook: webhook || null, + }; + const created = await c2api.createAlertRule(body) as AlertRule; + setRules((prev) => [created, ...prev]); + setName(""); setKeywords(""); setTgIds(""); setWebhook(""); + } catch { + setError("Failed to create rule."); + } finally { + setSaving(false); + } + } + + async function handleToggle(rule: AlertRule) { + try { + await c2api.updateAlertRule(rule.rule_id, { enabled: !rule.enabled }); + setRules((prev) => prev.map((r) => r.rule_id === rule.rule_id ? { ...r, enabled: !r.enabled } : r)); + } catch (e) { console.error(e); } + } + + async function handleDelete(id: string) { + try { + await c2api.deleteAlertRule(id); + setRules((prev) => prev.filter((r) => r.rule_id !== id)); + } catch (e) { console.error(e); } + } + + return ( +
+ {isAdmin && ( +
+

New Alert Rule

+
+
+ + setName(e.target.value)} + placeholder="e.g. Structure Fire" + className="w-full bg-gray-800 border border-gray-700 rounded-lg px-3 py-2 text-white text-sm focus:outline-none focus:border-indigo-500" + /> +
+
+ + setKeywords(e.target.value)} + placeholder="fire, smoke, structure" + className="w-full bg-gray-800 border border-gray-700 rounded-lg px-3 py-2 text-white text-sm focus:outline-none focus:border-indigo-500" + /> +
+
+ + setTgIds(e.target.value)} + placeholder="9048, 9600" + className="w-full bg-gray-800 border border-gray-700 rounded-lg px-3 py-2 text-white text-sm focus:outline-none focus:border-indigo-500" + /> +
+
+ + setWebhook(e.target.value)} + placeholder="https://discord.com/api/webhooks/…" + className="w-full bg-gray-800 border border-gray-700 rounded-lg px-3 py-2 text-white text-sm focus:outline-none focus:border-indigo-500" + /> +
+
+ {error &&

{error}

} + +
+ )} + +
+ {rules.length === 0 ? ( +

No alert rules configured.

+ ) : ( + + + + + + + + + {isAdmin && } + + + + {rules.map((rule) => ( + + + + + + + {isAdmin && ( + + )} + + ))} + +
NameKeywordsTalkgroupsWebhookEnabled
{rule.name}{rule.keywords.join(", ") || "—"}{rule.talkgroup_ids.join(", ") || "—"} + {rule.discord_webhook ? ( + configured + ) : "—"} + + {isAdmin ? ( + + ) : ( + + {rule.enabled ? "enabled" : "disabled"} + + )} + + +
+ )} +
+
+ ); +} + +export default function AlertsPage() { + const { isAdmin } = useAuth(); + const { alerts, loading } = useAlerts(); + const [tab, setTab] = useState<"events" | "rules">("events"); + + async function handleAcknowledge(id: string) { + try { + await c2api.acknowledgeAlert(id); + } catch (e) { console.error(e); } + } + + const unacked = alerts.filter((a) => !a.acknowledged); + + return ( +
+
+

Alerts

+ {unacked.length > 0 && ( + + {unacked.length} unacknowledged + + )} +
+ + {/* Tabs */} +
+ {(["events", ...(isAdmin ? ["rules"] : [])] as const).map((t) => ( + + ))} +
+ + {tab === "events" && ( + loading ? ( +

Loading…

+ ) : alerts.length === 0 ? ( +

No alerts triggered yet.

+ ) : ( +
+ + + + + + + + + + + + + {alerts.map((alert) => ( + + + + + + + + + ))} + +
RuleTalkgroupMatchedSnippetTimeStatus
{alert.rule_name} + {alert.talkgroup_name || alert.talkgroup_id || "—"} + +
+ {alert.matched_keywords.map((kw) => ( + + {kw} + + ))} +
+
+ {alert.transcript_snippet || "—"} + {fmtTime(alert.triggered_at)} + {alert.acknowledged ? ( + acked + ) : ( + + )} +
+
+ ) + )} + + {tab === "rules" && } +
+ ); +} diff --git a/drb-frontend/app/calls/page.tsx b/drb-frontend/app/calls/page.tsx index 69c2906..d804f31 100644 --- a/drb-frontend/app/calls/page.tsx +++ b/drb-frontend/app/calls/page.tsx @@ -36,6 +36,7 @@ export default function CallsPage() { Node Duration Audio + diff --git a/drb-frontend/app/incidents/page.tsx b/drb-frontend/app/incidents/page.tsx new file mode 100644 index 0000000..7a91c8a --- /dev/null +++ b/drb-frontend/app/incidents/page.tsx @@ -0,0 +1,285 @@ +"use client"; + +import { useState } from "react"; +import { useAuth } from "@/components/AuthProvider"; +import { useIncidents } from "@/lib/useIncidents"; +import { c2api } from "@/lib/c2api"; +import type { IncidentRecord } from "@/lib/types"; + +const TYPE_COLORS: Record = { + fire: "bg-red-900 text-red-300", + police: "bg-blue-900 text-blue-300", + ems: "bg-yellow-900 text-yellow-300", + accident: "bg-orange-900 text-orange-300", + other: "bg-gray-800 text-gray-300", +}; + +function typeBadge(type: string | null) { + const cls = TYPE_COLORS[type ?? "other"] ?? TYPE_COLORS.other; + return ( + + {type ?? "other"} + + ); +} + +function fmtTime(iso: string) { + try { return new Date(iso).toLocaleString(); } catch { return iso; } +} + +function IncidentRow({ incident, isAdmin, onResolve }: { + incident: IncidentRecord; + isAdmin: boolean; + onResolve: (id: string) => void; +}) { + const [expanded, setExpanded] = useState(false); + + return ( + <> + setExpanded((v) => !v)} + > + {typeBadge(incident.type)} + {incident.title ?? "—"} + + + {incident.status} + + + {incident.call_ids.length} + {fmtTime(incident.started_at)} + {fmtTime(incident.updated_at)} + + {isAdmin && incident.status === "active" && ( + + )} + + + {expanded && ( + + + {incident.summary && ( +

{incident.summary}

+ )} + {incident.tags.length > 0 && ( +
+ {incident.tags.map((t) => ( + {t} + ))} +
+ )} +
+ Linked calls: + {incident.call_ids.length === 0 ? "none" : incident.call_ids.map((id, i) => ( + + {id.slice(0, 8)}… + {i < incident.call_ids.length - 1 && ", "} + + ))} +
+ + + )} + + ); +} + +function CreateModal({ onClose, onCreate }: { + onClose: () => void; + onCreate: (body: object) => Promise; +}) { + const [title, setTitle] = useState(""); + const [type, setType] = useState("other"); + const [summary, setSummary] = useState(""); + const [saving, setSaving] = useState(false); + + async function handleSubmit(e: React.FormEvent) { + e.preventDefault(); + setSaving(true); + try { + await onCreate({ title, type, summary: summary || null, status: "active" }); + onClose(); + } finally { + setSaving(false); + } + } + + return ( +
+
+

Create Incident

+
+ + setTitle(e.target.value)} + className="w-full bg-gray-800 border border-gray-700 rounded-lg px-3 py-2 text-white text-sm focus:outline-none focus:border-indigo-500" + /> +
+
+ + +
+
+ +