commit 2f0597c81b2d0099eb4a77317a4e77be60e30c6e Author: Logan Date: Sun Apr 5 19:01:39 2026 -0400 Initial commit — DRB server stack Includes c2-core (FastAPI/MQTT/Firestore), discord-bot (slash commands), frontend (Next.js admin UI), and mosquitto config. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..add193e --- /dev/null +++ b/.gitignore @@ -0,0 +1,31 @@ +# Environment / secrets +.env +drb-c2-core/.env +drb-server-discord-bot/.env +drb-frontend/.env +drb-c2-core/gcp-key.json + +# Python +__pycache__/ +*.py[cod] +*.pyo +.venv/ +venv/ + +# Node / Next.js +node_modules/ +.next/ +*.tsbuildinfo + +# Logs and debug captures +*.log +logs/ +*.har + +# Docker volumes / runtime data +mosquitto/data/ +recordings/ + +# OS +.DS_Store +Thumbs.db diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..089f094 --- /dev/null +++ b/Makefile @@ -0,0 +1,28 @@ +.PHONY: setup up down build logs logs-c2 logs-bot logs-frontend + +setup: + @[ -f drb-c2-core/.env ] && echo "drb-c2-core/.env already exists, skipping." || (cp drb-c2-core/.env.example drb-c2-core/.env && echo "Created drb-c2-core/.env") + @[ -f drb-server-discord-bot/.env ] && echo "drb-server-discord-bot/.env already exists, skipping." || (cp drb-server-discord-bot/.env.example drb-server-discord-bot/.env && echo "Created drb-server-discord-bot/.env") + @[ -f drb-frontend/.env ] && echo "drb-frontend/.env already exists, skipping." || (cp drb-frontend/.env.example drb-frontend/.env && echo "Created drb-frontend/.env") + @echo "Done. Fill in any secrets before running 'make up'." + +up: + docker compose up -d + +down: + docker compose down + +build: + docker compose build + +logs: + docker compose logs -f + +logs-c2: + docker compose logs -f c2-core + +logs-bot: + docker compose logs -f discord-bot + +logs-frontend: + docker compose logs -f frontend diff --git a/README.md b/README.md new file mode 100644 index 0000000..ac87fe2 --- /dev/null +++ b/README.md @@ -0,0 +1,107 @@ +# DRB Server + +The server-side stack for the Discord Radio Bot system. Handles command-and-control, Firestore sync, Discord slash commands, and the web frontend. + +## Services + +| Service | Description | Port | +|---|---|---| +| `mosquitto` | MQTT broker — receives telemetry from edge nodes | 1883 | +| `c2-core` | FastAPI C2 API — processes MQTT, writes to Firestore, manages nodes/systems/tokens | 8888 | +| `discord-bot` | Discord bot — `/join`, `/leave`, `/status` slash commands | — | +| `frontend` | Next.js admin UI — real-time dashboard, node management, call history | 3000 | + +## Prerequisites + +- Docker + Docker Compose +- A Firebase project with Firestore enabled (Native mode) +- A GCP service account key with Firestore and Firebase Auth permissions +- A Discord bot token (for the server bot that handles slash commands) +- One or more Discord bot tokens in the token pool (for edge nodes to join voice channels) + +## Setup + +```bash +# 1. Copy env files +make setup + +# 2. Fill in secrets +# drb-c2-core/.env — MQTT, Firestore database name, GCS bucket +# drb-server-discord-bot/.env — Discord bot token, C2 URL +# drb-frontend/.env — Firebase config (NEXT_PUBLIC_*), C2 URL + +# 3. Place your GCP service account key +cp /path/to/your-key.json drb-c2-core/gcp-key.json + +# 4. Build and start +make build +make up +``` + +## Environment Variables + +### `drb-c2-core/.env` +| Variable | Description | +|---|---| +| `MQTT_BROKER` | Hostname of the MQTT broker (default: `mosquitto`) | +| `FIRESTORE_DATABASE` | Firestore database name (default: `(default)`) | +| `GCP_CREDENTIALS_PATH` | Path to the GCP key file inside the container | +| `GCS_BUCKET` | GCS bucket name for audio uploads (optional) | + +### `drb-server-discord-bot/.env` +| Variable | Description | +|---|---| +| `DISCORD_TOKEN` | Bot token for the server-side command bot | +| `C2_URL` | Internal URL of c2-core (default: `http://c2-core:8000`) | +| `DEV_GUILD_ID` | Optional guild ID to sync slash commands instantly during dev | + +### `drb-frontend/.env` +| Variable | Description | +|---|---| +| `NEXT_PUBLIC_FIREBASE_*` | Firebase project config (from Firebase console) | +| `NEXT_PUBLIC_FIRESTORE_DATABASE` | Firestore database name (must match c2-core) | +| `NEXT_PUBLIC_C2_URL` | C2 API URL reachable from the browser | + +## Admin Setup + +After the stack is running, grant admin access to your Firebase user: + +```bash +cd drb-c2-core +python scripts/set_admin.py grant your@email.com +``` + +Then sign out and back in to the frontend. + +## Makefile Targets + +``` +make setup — copy .env.example files +make build — docker compose build +make up — docker compose up -d +make down — docker compose down +make logs — follow all logs +make logs-c2 — c2-core logs only +make logs-bot — discord-bot logs only +make logs-frontend — frontend logs only +``` + +## Architecture + +``` +Edge Node (client machine) + │ + ├── MQTT checkin/status/metadata ──► mosquitto ──► c2-core ──► Firestore + │ + └── MQTT commands ◄─────────────────────────────── c2-core + │ + └── discord_join ──► edge node joins Discord voice + streams Icecast + +Discord User + │ + └── /join /leave /status ──► discord-bot ──► c2-core ──► MQTT ──► edge node + +Browser (admin) + └── frontend ──► Firestore (real-time reads) + └── c2-core REST API (writes/commands) +``` diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..6e8c654 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,35 @@ +services: + mosquitto: + image: eclipse-mosquitto:2 + restart: unless-stopped + ports: + - "1883:1883" + volumes: + - ./drb-c2-core/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf + + c2-core: + build: ./drb-c2-core + restart: unless-stopped + ports: + - "8888:8000" + env_file: ./drb-c2-core/.env + volumes: + - ./drb-c2-core/gcp-key.json:/app/gcp-key.json:ro + depends_on: + - mosquitto + + discord-bot: + build: ./drb-server-discord-bot + restart: unless-stopped + env_file: ./drb-server-discord-bot/.env + depends_on: + - c2-core + + frontend: + build: ./drb-frontend + restart: unless-stopped + ports: + - "3000:3000" + env_file: ./drb-frontend/.env + depends_on: + - c2-core diff --git a/drb-c2-core/.dockerignore b/drb-c2-core/.dockerignore new file mode 100644 index 0000000..0068393 --- /dev/null +++ b/drb-c2-core/.dockerignore @@ -0,0 +1,7 @@ +__pycache__ +*.pyc +*.pyo +.git +*.log +.env +.pytest_cache diff --git a/drb-c2-core/.env.example b/drb-c2-core/.env.example new file mode 100644 index 0000000..12b86da --- /dev/null +++ b/drb-c2-core/.env.example @@ -0,0 +1,21 @@ +# MQTT broker (usually the mosquitto container on this host) +MQTT_BROKER=mosquitto +MQTT_PORT=1883 +MQTT_USER= +MQTT_PASS= + +# GCP — path to service account JSON inside the container +GCP_CREDENTIALS_PATH=/app/gcp-key.json + +# Firestore database name (use "(default)" if you didn't create a named database) +FIRESTORE_DATABASE=c2-server + +# GCS bucket for audio storage +GCS_BUCKET=your-bucket-name + +# How long (seconds) before a node is marked offline if no checkin received +NODE_OFFLINE_THRESHOLD=90 + +# Auth — static key that edge nodes send as Bearer token on /upload +# Generate with: openssl rand -hex 32 +NODE_API_KEY= diff --git a/drb-c2-core/Dockerfile b/drb-c2-core/Dockerfile new file mode 100644 index 0000000..26030f9 --- /dev/null +++ b/drb-c2-core/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app/ ./app/ +COPY tests/ ./tests/ + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/drb-c2-core/app/__init__.py b/drb-c2-core/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/drb-c2-core/app/config.py b/drb-c2-core/app/config.py new file mode 100644 index 0000000..cd589d1 --- /dev/null +++ b/drb-c2-core/app/config.py @@ -0,0 +1,24 @@ +from pydantic_settings import BaseSettings +from typing import Optional + + +class Settings(BaseSettings): + # MQTT + mqtt_broker: str = "localhost" + mqtt_port: int = 1883 + mqtt_user: Optional[str] = None + mqtt_pass: Optional[str] = None + + # GCP + gcp_credentials_path: Optional[str] = None # None → uses ADC + gcs_bucket: Optional[str] = None # None → audio upload disabled + firestore_database: str = "(default)" + + # Node health + node_offline_threshold: int = 90 # seconds without checkin before marking offline + + class Config: + env_file = ".env" + + +settings = Settings() diff --git a/drb-c2-core/app/internal/__init__.py b/drb-c2-core/app/internal/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/drb-c2-core/app/internal/auth.py b/drb-c2-core/app/internal/auth.py new file mode 100644 index 0000000..9f89ca8 --- /dev/null +++ b/drb-c2-core/app/internal/auth.py @@ -0,0 +1,28 @@ +from typing import Optional +from fastapi import HTTPException, Security +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from firebase_admin import auth as firebase_auth + +_bearer = HTTPBearer(auto_error=False) + + +async def require_firebase_token( + credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), +) -> dict: + """Verify a Firebase ID token from the Authorization: Bearer header.""" + if not credentials: + raise HTTPException(status_code=401, detail="Missing authorization token") + try: + return firebase_auth.verify_id_token(credentials.credentials) + except Exception: + raise HTTPException(status_code=401, detail="Invalid or expired token") + + +async def require_admin_token( + credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), +) -> dict: + """Verify a Firebase ID token AND require the admin custom claim.""" + decoded = await require_firebase_token(credentials) + if not decoded.get("admin"): + raise HTTPException(status_code=403, detail="Admin access required") + return decoded diff --git a/drb-c2-core/app/internal/firestore.py b/drb-c2-core/app/internal/firestore.py new file mode 100644 index 0000000..66e26ee --- /dev/null +++ b/drb-c2-core/app/internal/firestore.py @@ -0,0 +1,62 @@ +import asyncio +from typing import Optional, Any +import firebase_admin +from firebase_admin import credentials, firestore as fs +from app.config import settings +from app.internal.logger import logger + + +def _init_firebase(): + if firebase_admin._apps: + return firestore.client() + + if settings.gcp_credentials_path: + cred = credentials.Certificate(settings.gcp_credentials_path) + else: + cred = credentials.ApplicationDefault() + + firebase_admin.initialize_app(cred) + logger.info("Firebase initialised.") + + +_init_firebase() +db = fs.client(database_id=settings.firestore_database) + + +# --------------------------------------------------------------------------- +# Thin async wrappers — firebase-admin is synchronous, run in thread executor +# --------------------------------------------------------------------------- + +async def doc_set(collection: str, doc_id: str, data: dict, merge: bool = True) -> None: + ref = db.collection(collection).document(doc_id) + await asyncio.to_thread(ref.set, data, merge=merge) + + +async def doc_get(collection: str, doc_id: str) -> Optional[dict]: + ref = db.collection(collection).document(doc_id) + snap = await asyncio.to_thread(ref.get) + return snap.to_dict() if snap.exists else None + + +async def doc_update(collection: str, doc_id: str, data: dict) -> None: + ref = db.collection(collection).document(doc_id) + await asyncio.to_thread(ref.update, data) + + +async def collection_list(collection: str, **filters) -> list[dict]: + """ + List all documents in a collection. + Optional keyword filters: field=value pairs passed as equality where-clauses. + """ + def _query(): + ref = db.collection(collection) + for field, value in filters.items(): + ref = ref.where(field, "==", value) + return [doc.to_dict() for doc in ref.stream()] + + return await asyncio.to_thread(_query) + + +async def doc_delete(collection: str, doc_id: str) -> None: + ref = db.collection(collection).document(doc_id) + await asyncio.to_thread(ref.delete) diff --git a/drb-c2-core/app/internal/logger.py b/drb-c2-core/app/internal/logger.py new file mode 100644 index 0000000..13ead5c --- /dev/null +++ b/drb-c2-core/app/internal/logger.py @@ -0,0 +1,10 @@ +import logging +import sys + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], +) + +logger = logging.getLogger("drb-c2-core") diff --git a/drb-c2-core/app/internal/mqtt_handler.py b/drb-c2-core/app/internal/mqtt_handler.py new file mode 100644 index 0000000..8a60110 --- /dev/null +++ b/drb-c2-core/app/internal/mqtt_handler.py @@ -0,0 +1,244 @@ +import asyncio +import json +from datetime import datetime, timezone +from typing import Optional +import paho.mqtt.client as mqtt +from app.config import settings +from app.internal.logger import logger +from app.internal import firestore as fstore + + +class MQTTHandler: + def __init__(self): + self._client: Optional[mqtt.Client] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._connected = False + + def _build_client(self) -> mqtt.Client: + client = mqtt.Client( + callback_api_version=mqtt.CallbackAPIVersion.VERSION2, + client_id="drb-c2-core", + ) + if settings.mqtt_user: + client.username_pw_set(settings.mqtt_user, settings.mqtt_pass) + + client.on_connect = self._on_connect + client.on_disconnect = self._on_disconnect + client.on_message = self._on_message + return client + + def _on_connect(self, client, userdata, flags, reason_code, properties): + if reason_code == 0: + self._connected = True + client.subscribe("nodes/+/checkin", qos=1) + client.subscribe("nodes/+/status", qos=1) + client.subscribe("nodes/+/metadata", qos=1) + logger.info("MQTT connected — subscribed to node topics.") + else: + logger.error(f"MQTT connect refused: {reason_code}") + + def _on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties): + self._connected = False + logger.warning(f"MQTT disconnected: {reason_code}") + + def _on_message(self, client, userdata, msg): + try: + payload = json.loads(msg.payload.decode()) + except Exception: + logger.warning(f"Non-JSON MQTT message on {msg.topic}") + return + + asyncio.run_coroutine_threadsafe( + self._dispatch(msg.topic, payload), self._loop + ) + + async def _dispatch(self, topic: str, payload: dict): + parts = topic.split("/") + # Expected: nodes/{node_id}/{type} + if len(parts) != 3 or parts[0] != "nodes": + return + + node_id = parts[1] + msg_type = parts[2] + + try: + if msg_type == "checkin": + await self._handle_checkin(node_id, payload) + elif msg_type == "status": + await self._handle_status(node_id, payload) + elif msg_type == "metadata": + await self._handle_metadata(node_id, payload) + except Exception as e: + logger.error(f"MQTT dispatch error [{msg_type}] from {node_id}: {e}") + + # ------------------------------------------------------------------ + # Checkin — upsert node; flag new unconfigured nodes + # ------------------------------------------------------------------ + + async def _handle_checkin(self, node_id: str, payload: dict): + existing = await fstore.doc_get("nodes", node_id) + now = datetime.now(timezone.utc) + + if not existing: + # First time we've seen this node — create it as unconfigured, pending approval + doc = { + "node_id": node_id, + "name": payload.get("name", node_id), + "lat": payload.get("lat", 0.0), + "lon": payload.get("lon", 0.0), + "status": "unconfigured", + "configured": False, + "last_seen": now.isoformat(), + "assigned_system_id": None, + "approval_status": "pending", + } + await fstore.doc_set("nodes", node_id, doc, merge=False) + logger.info(f"New node registered: {node_id} — pending admin approval.") + else: + updates = { + "last_seen": now.isoformat(), + "name": payload.get("name", existing.get("name", node_id)), + "lat": payload.get("lat", existing.get("lat", 0.0)), + "lon": payload.get("lon", existing.get("lon", 0.0)), + } + # Only promote to online if already configured (don't overwrite explicit status) + if existing.get("configured") and existing.get("status") not in ("recording",): + updates["status"] = "online" + await fstore.doc_update("nodes", node_id, updates) + + # ------------------------------------------------------------------ + # Status update + # ------------------------------------------------------------------ + + async def _handle_status(self, node_id: str, payload: dict): + status = payload.get("status") + if not status: + return + await fstore.doc_update("nodes", node_id, { + "status": status, + "last_seen": datetime.now(timezone.utc).isoformat(), + }) + + # ------------------------------------------------------------------ + # Metadata — call_start / call_end events + # ------------------------------------------------------------------ + + async def _handle_metadata(self, node_id: str, payload: dict): + event = payload.get("event") + if event == "call_start": + await self._on_call_start(node_id, payload) + elif event == "call_end": + await self._on_call_end(node_id, payload) + + async def _on_call_start(self, node_id: str, payload: dict): + call_id = payload.get("call_id") + if not call_id: + return + + # Look up assigned system for this node + node = await fstore.doc_get("nodes", node_id) + system_id = node.get("assigned_system_id") if node else None + + started_at_raw = payload.get("started_at") + started_at = ( + datetime.fromisoformat(started_at_raw) + if started_at_raw + else datetime.now(timezone.utc) + ) + + doc = { + "call_id": call_id, + "node_id": node_id, + "system_id": system_id, + "talkgroup_id": payload.get("tgid"), + "talkgroup_name": payload.get("tgid_name") or "", + "freq": payload.get("freq"), + "srcaddr": payload.get("srcaddr"), + "started_at": started_at, + "ended_at": None, + "audio_url": None, + "transcript": None, + "incident_id": None, + "location": None, + "tags": [], + "status": "active", + } + await fstore.doc_set("calls", call_id, doc, merge=False) + logger.info(f"Call start: {call_id} (node={node_id}, tgid={payload.get('tgid')})") + + async def _on_call_end(self, node_id: str, payload: dict): + call_id = payload.get("call_id") + if not call_id: + return + + ended_at_raw = payload.get("ended_at") + ended_at = ( + datetime.fromisoformat(ended_at_raw) + if ended_at_raw + else datetime.now(timezone.utc) + ) + + updates = { + "ended_at": ended_at, + "status": "ended", + } + if payload.get("audio_url"): + updates["audio_url"] = payload["audio_url"] + + await fstore.doc_update("calls", call_id, updates) + logger.info(f"Call end: {call_id}") + + # ------------------------------------------------------------------ + # Outbound — send a command to a specific node + # ------------------------------------------------------------------ + + def send_command(self, node_id: str, payload: dict): + topic = f"nodes/{node_id}/commands" + if self._client and self._connected: + self._client.publish(topic, json.dumps(payload), qos=1) + logger.info(f"Command sent to {node_id}: {payload.get('action')}") + else: + logger.warning(f"MQTT not connected — could not send command to {node_id}") + + def push_config(self, node_id: str, system_config: dict): + topic = f"nodes/{node_id}/config" + if self._client and self._connected: + self._client.publish(topic, json.dumps(system_config), qos=1) + logger.info(f"Config pushed to {node_id}") + else: + logger.warning(f"MQTT not connected — could not push config to {node_id}") + + def publish_node_key(self, node_id: str, api_key: str): + """Publish the provisioned API key to the node (retained so it survives reconnects).""" + topic = f"nodes/{node_id}/api_key" + if self._client and self._connected: + self._client.publish(topic, json.dumps({"api_key": api_key}), qos=2, retain=True) + logger.info(f"API key provisioned to {node_id}") + else: + logger.warning(f"MQTT not connected — could not provision key to {node_id}") + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def connect(self): + self._loop = asyncio.get_event_loop() + self._client = self._build_client() + try: + self._client.connect(settings.mqtt_broker, settings.mqtt_port, keepalive=60) + self._client.loop_start() + logger.info(f"MQTT connecting to {settings.mqtt_broker}:{settings.mqtt_port}") + except Exception as e: + logger.error(f"MQTT connection error: {e}") + + async def disconnect(self): + if self._client: + self._client.loop_stop() + self._client.disconnect() + + @property + def is_connected(self) -> bool: + return self._connected + + +mqtt_handler = MQTTHandler() diff --git a/drb-c2-core/app/internal/node_sweeper.py b/drb-c2-core/app/internal/node_sweeper.py new file mode 100644 index 0000000..f6f51a6 --- /dev/null +++ b/drb-c2-core/app/internal/node_sweeper.py @@ -0,0 +1,55 @@ +import asyncio +from datetime import datetime, timezone, timedelta +from app.config import settings +from app.internal.logger import logger +from app.internal import firestore as fstore + +SWEEP_INTERVAL = 30 # seconds + + +async def sweeper_loop(): + """ + Periodically check for nodes that haven't checked in recently + and mark them offline in Firestore. + """ + logger.info("Node sweeper started.") + while True: + await asyncio.sleep(SWEEP_INTERVAL) + try: + await _sweep() + except Exception as e: + logger.error(f"Sweeper error: {e}") + + +async def _sweep(): + threshold = datetime.now(timezone.utc) - timedelta(seconds=settings.node_offline_threshold) + + def _query(): + from app.internal.firestore import db + return [ + doc.to_dict() + for doc in db.collection("nodes").stream() + ] + + nodes = await asyncio.to_thread(_query) + for node in nodes: + status = node.get("status", "offline") + if status == "offline": + continue + + last_seen_raw = node.get("last_seen") + if not last_seen_raw: + continue + + # last_seen may be a Firestore Timestamp, a datetime, or an ISO string + if isinstance(last_seen_raw, str): + last_seen = datetime.fromisoformat(last_seen_raw) + else: + last_seen = last_seen_raw + if last_seen.tzinfo is None: + last_seen = last_seen.replace(tzinfo=timezone.utc) + + if last_seen < threshold: + node_id = node.get("node_id") + await fstore.doc_update("nodes", node_id, {"status": "offline"}) + logger.info(f"Node {node_id} marked offline (last seen: {last_seen.isoformat()})") diff --git a/drb-c2-core/app/internal/storage.py b/drb-c2-core/app/internal/storage.py new file mode 100644 index 0000000..3e9290a --- /dev/null +++ b/drb-c2-core/app/internal/storage.py @@ -0,0 +1,28 @@ +import asyncio +from typing import Optional +from app.config import settings +from app.internal.logger import logger + + +async def upload_audio(data: bytes, filename: str) -> Optional[str]: + """Upload audio bytes to GCS and return the public URL, or None if disabled.""" + if not settings.gcs_bucket: + logger.info("GCS_BUCKET not configured — skipping audio upload.") + return None + + def _upload() -> str: + from google.cloud import storage + client = storage.Client() + bucket = client.bucket(settings.gcs_bucket) + blob = bucket.blob(f"calls/{filename}") + blob.upload_from_string(data, content_type="audio/mpeg") + blob.make_public() + return blob.public_url + + try: + url = await asyncio.to_thread(_upload) + logger.info(f"Audio uploaded: {url}") + return url + except Exception as e: + logger.error(f"GCS upload failed: {e}") + return None diff --git a/drb-c2-core/app/main.py b/drb-c2-core/app/main.py new file mode 100644 index 0000000..c333bd7 --- /dev/null +++ b/drb-c2-core/app/main.py @@ -0,0 +1,44 @@ +import asyncio +from contextlib import asynccontextmanager +from fastapi import FastAPI, Depends +from fastapi.middleware.cors import CORSMiddleware +from app.internal.logger import logger +from app.internal.mqtt_handler import mqtt_handler +from app.internal.node_sweeper import sweeper_loop +from app.internal.auth import require_firebase_token +from app.routers import nodes, systems, calls, upload, tokens + + +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info("DRB C2 Core starting.") + + await mqtt_handler.connect() + sweeper_task = asyncio.create_task(sweeper_loop()) + + yield # --- app running --- + + logger.info("DRB C2 Core shutting down.") + sweeper_task.cancel() + await mqtt_handler.disconnect() + + +app = FastAPI(title="DRB C2 Core", lifespan=lifespan) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + +app.include_router(nodes.router, dependencies=[Depends(require_firebase_token)]) +app.include_router(systems.router, dependencies=[Depends(require_firebase_token)]) +app.include_router(calls.router, dependencies=[Depends(require_firebase_token)]) +app.include_router(tokens.router, dependencies=[Depends(require_firebase_token)]) +app.include_router(upload.router) # auth is per-node, handled inline + + +@app.get("/health") +async def health(): + return {"ok": True, "mqtt_connected": mqtt_handler.is_connected} diff --git a/drb-c2-core/app/models.py b/drb-c2-core/app/models.py new file mode 100644 index 0000000..651195f --- /dev/null +++ b/drb-c2-core/app/models.py @@ -0,0 +1,80 @@ +from pydantic import BaseModel, Field +from typing import Optional, List, Dict, Any +from datetime import datetime + + +# --------------------------------------------------------------------------- +# Nodes +# --------------------------------------------------------------------------- + +class NodeRecord(BaseModel): + node_id: str + name: str + lat: float = 0.0 + lon: float = 0.0 + status: str = "offline" # online / offline / recording / unconfigured + configured: bool = False + last_seen: Optional[datetime] = None + assigned_system_id: Optional[str] = None + + +class CommandPayload(BaseModel): + action: str # discord_join / discord_leave / op25_restart + guild_id: Optional[str] = None + channel_id: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Systems +# --------------------------------------------------------------------------- + +class SystemRecord(BaseModel): + system_id: str + name: str + type: str # P25 / DMR / NBFM + config: Dict[str, Any] = {} # OP25-compatible config blob + + +class SystemCreate(BaseModel): + name: str + type: str + config: Dict[str, Any] = {} + + +# --------------------------------------------------------------------------- +# Calls +# --------------------------------------------------------------------------- + +class CallRecord(BaseModel): + call_id: str + node_id: str + system_id: Optional[str] = None + talkgroup_id: Optional[int] = None + talkgroup_name: Optional[str] = None + freq: Optional[float] = None + srcaddr: Optional[str] = None + started_at: datetime + ended_at: Optional[datetime] = None + audio_url: Optional[str] = None + transcript: Optional[str] = None # populated later by STT + incident_id: Optional[str] = None # populated later by intelligence layer + location: Optional[Dict[str, float]] = None # {lat, lng} + tags: List[str] = [] + status: str = "active" # active / ended + + +# --------------------------------------------------------------------------- +# Incidents +# --------------------------------------------------------------------------- + +class IncidentRecord(BaseModel): + incident_id: str + title: Optional[str] = None + type: Optional[str] = None # fire / police / ems / etc. + status: str = "active" # active / resolved + location: Optional[Dict[str, float]] = None + call_ids: List[str] = [] + started_at: datetime + updated_at: datetime + summary: Optional[str] = None + tags: List[str] = [] diff --git a/drb-c2-core/app/routers/__init__.py b/drb-c2-core/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/drb-c2-core/app/routers/calls.py b/drb-c2-core/app/routers/calls.py new file mode 100644 index 0000000..d815a14 --- /dev/null +++ b/drb-c2-core/app/routers/calls.py @@ -0,0 +1,29 @@ +from fastapi import APIRouter, HTTPException, Query +from typing import Optional +from app.internal import firestore as fstore + +router = APIRouter(prefix="/calls", tags=["calls"]) + + +@router.get("") +async def list_calls( + node_id: Optional[str] = Query(None), + status: Optional[str] = Query(None), + system_id: Optional[str] = Query(None), +): + filters = {} + if node_id: + filters["node_id"] = node_id + if status: + filters["status"] = status + if system_id: + filters["system_id"] = system_id + return await fstore.collection_list("calls", **filters) + + +@router.get("/{call_id}") +async def get_call(call_id: str): + call = await fstore.doc_get("calls", call_id) + if not call: + raise HTTPException(404, f"Call '{call_id}' not found.") + return call diff --git a/drb-c2-core/app/routers/nodes.py b/drb-c2-core/app/routers/nodes.py new file mode 100644 index 0000000..4bd1032 --- /dev/null +++ b/drb-c2-core/app/routers/nodes.py @@ -0,0 +1,91 @@ +import secrets +from fastapi import APIRouter, HTTPException, Depends +from app.models import CommandPayload +from app.internal import firestore as fstore +from app.internal.mqtt_handler import mqtt_handler +from app.internal.auth import require_admin_token +from app.routers.tokens import assign_token, release_token + +router = APIRouter(prefix="/nodes", tags=["nodes"]) + + +@router.get("") +async def list_nodes(): + return await fstore.collection_list("nodes") + + +@router.get("/{node_id}") +async def get_node(node_id: str): + node = await fstore.doc_get("nodes", node_id) + if not node: + raise HTTPException(404, f"Node '{node_id}' not found.") + return node + + +@router.post("/{node_id}/approve") +async def approve_node(node_id: str, _: dict = Depends(require_admin_token)): + node = await fstore.doc_get("nodes", node_id) + if not node: + raise HTTPException(404, f"Node '{node_id}' not found.") + + api_key = secrets.token_hex(32) + await fstore.doc_set("node_keys", node_id, {"node_id": node_id, "api_key": api_key}, merge=False) + await fstore.doc_update("nodes", node_id, {"approval_status": "approved"}) + mqtt_handler.publish_node_key(node_id, api_key) + return {"ok": True} + + +@router.post("/{node_id}/reject") +async def reject_node(node_id: str, _: dict = Depends(require_admin_token)): + node = await fstore.doc_get("nodes", node_id) + if not node: + raise HTTPException(404, f"Node '{node_id}' not found.") + await fstore.doc_update("nodes", node_id, {"approval_status": "rejected"}) + return {"ok": True} + + +@router.post("/{node_id}/command") +async def send_command(node_id: str, cmd: CommandPayload): + node = await fstore.doc_get("nodes", node_id) + if not node: + raise HTTPException(404, f"Node '{node_id}' not found.") + + payload = cmd.model_dump(exclude_none=True) + + if cmd.action == "discord_join": + token = await assign_token(node_id) + if not token: + raise HTTPException(503, "No Discord bot tokens available in the pool.") + payload["token"] = token + + elif cmd.action == "discord_leave": + await release_token(node_id) + + mqtt_handler.send_command(node_id, payload) + return {"ok": True} + + +@router.post("/{node_id}/config/{system_id}") +async def assign_system(node_id: str, system_id: str): + """ + Assign a system to a node. Fetches the system config from Firestore + and pushes it to the node via MQTT, then marks the node as configured. + """ + node = await fstore.doc_get("nodes", node_id) + if not node: + raise HTTPException(404, f"Node '{node_id}' not found.") + + system = await fstore.doc_get("systems", system_id) + if not system: + raise HTTPException(404, f"System '{system_id}' not found.") + + # Push config to the node via MQTT + mqtt_handler.push_config(node_id, system) + + # Update Firestore + await fstore.doc_update("nodes", node_id, { + "assigned_system_id": system_id, + "configured": True, + }) + + return {"ok": True} diff --git a/drb-c2-core/app/routers/systems.py b/drb-c2-core/app/routers/systems.py new file mode 100644 index 0000000..fd37f88 --- /dev/null +++ b/drb-c2-core/app/routers/systems.py @@ -0,0 +1,44 @@ +import uuid +from fastapi import APIRouter, HTTPException +from app.models import SystemCreate, SystemRecord +from app.internal import firestore as fstore + +router = APIRouter(prefix="/systems", tags=["systems"]) + + +@router.get("") +async def list_systems(): + return await fstore.collection_list("systems") + + +@router.get("/{system_id}") +async def get_system(system_id: str): + system = await fstore.doc_get("systems", system_id) + if not system: + raise HTTPException(404, f"System '{system_id}' not found.") + return system + + +@router.post("", status_code=201) +async def create_system(body: SystemCreate): + system_id = str(uuid.uuid4()) + doc = SystemRecord(system_id=system_id, **body.model_dump()) + await fstore.doc_set("systems", system_id, doc.model_dump(), merge=False) + return doc + + +@router.put("/{system_id}") +async def update_system(system_id: str, body: SystemCreate): + existing = await fstore.doc_get("systems", system_id) + if not existing: + raise HTTPException(404, f"System '{system_id}' not found.") + await fstore.doc_update("systems", system_id, body.model_dump()) + return {**existing, **body.model_dump()} + + +@router.delete("/{system_id}", status_code=204) +async def delete_system(system_id: str): + existing = await fstore.doc_get("systems", system_id) + if not existing: + raise HTTPException(404, f"System '{system_id}' not found.") + await fstore.doc_delete("systems", system_id) diff --git a/drb-c2-core/app/routers/tokens.py b/drb-c2-core/app/routers/tokens.py new file mode 100644 index 0000000..489f68f --- /dev/null +++ b/drb-c2-core/app/routers/tokens.py @@ -0,0 +1,103 @@ +import uuid +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from typing import Optional +from datetime import datetime, timezone +from app.internal import firestore as fstore + +router = APIRouter(prefix="/tokens", tags=["tokens"]) + + +class TokenCreate(BaseModel): + name: str # friendly label e.g. "DRB Bot 1" + token: str # the actual Discord bot token + + +# --------------------------------------------------------------------------- +# CRUD +# --------------------------------------------------------------------------- + +@router.get("") +async def list_tokens(): + """List all tokens. The actual token string is masked for safety.""" + tokens = await fstore.collection_list("bot_tokens") + return [ + {**t, "token": t["token"][:10] + "…" + t["token"][-4:]} + for t in tokens + ] + + +@router.post("", status_code=201) +async def add_token(body: TokenCreate): + token_id = str(uuid.uuid4()) + doc = { + "token_id": token_id, + "name": body.name, + "token": body.token, + "in_use": False, + "assigned_node_id": None, + "assigned_at": None, + } + await fstore.doc_set("bot_tokens", token_id, doc, merge=False) + return {"token_id": token_id, "name": body.name} + + +@router.delete("/{token_id}", status_code=204) +async def delete_token(token_id: str): + existing = await fstore.doc_get("bot_tokens", token_id) + if not existing: + raise HTTPException(404, "Token not found.") + if existing.get("in_use"): + raise HTTPException(409, "Token is currently in use by a node.") + await fstore.doc_delete("bot_tokens", token_id) + + +# --------------------------------------------------------------------------- +# Internal helpers — used by the nodes router, not exposed via HTTP +# --------------------------------------------------------------------------- + +async def assign_token(node_id: str) -> Optional[str]: + """ + Find a free token, mark it as in-use, return the token string. + Returns None if no tokens are available. + """ + def _find_free(): + from app.internal.firestore import db + docs = db.collection("bot_tokens").where("in_use", "==", False).limit(1).stream() + return [d for d in docs] + + import asyncio + results = await asyncio.to_thread(_find_free) + if not results: + return None + + doc = results[0] + token_id = doc.id + token_value = doc.to_dict()["token"] + + await fstore.doc_update("bot_tokens", token_id, { + "in_use": True, + "assigned_node_id": node_id, + "assigned_at": datetime.now(timezone.utc), + }) + return token_value + + +async def release_token(node_id: str) -> None: + """Free whichever token is currently assigned to this node.""" + def _find_assigned(): + from app.internal.firestore import db + return [ + d for d in db.collection("bot_tokens") + .where("assigned_node_id", "==", node_id) + .stream() + ] + + import asyncio + results = await asyncio.to_thread(_find_assigned) + for doc in results: + await fstore.doc_update("bot_tokens", doc.id, { + "in_use": False, + "assigned_node_id": None, + "assigned_at": None, + }) diff --git a/drb-c2-core/app/routers/upload.py b/drb-c2-core/app/routers/upload.py new file mode 100644 index 0000000..84eacb7 --- /dev/null +++ b/drb-c2-core/app/routers/upload.py @@ -0,0 +1,44 @@ +from typing import Optional +from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Security +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from app.internal.storage import upload_audio +from app.internal import firestore as fstore +from app.internal.logger import logger + +router = APIRouter(tags=["upload"]) + +_bearer = HTTPBearer(auto_error=False) + + +@router.post("/upload") +async def upload_call_audio( + file: UploadFile = File(...), + call_id: str = Form(...), + node_id: str = Form(...), + credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), +): + """ + Receive an audio recording from an edge node. + Upload to GCS, update the call document in Firestore with the audio URL. + """ + # Verify the per-node API key + if not credentials: + raise HTTPException(401, "Missing authorization") + key_doc = await fstore.doc_get("node_keys", node_id) + if not key_doc or key_doc.get("api_key") != credentials.credentials: + raise HTTPException(401, "Invalid node API key") + + data = await file.read() + if not data: + raise HTTPException(400, "Empty file.") + + filename = f"{call_id}_{file.filename}" + audio_url = await upload_audio(data, filename) + + if audio_url: + try: + await fstore.doc_update("calls", call_id, {"audio_url": audio_url}) + except Exception as e: + logger.warning(f"Could not update call {call_id} with audio_url: {e}") + + return {"url": audio_url} diff --git a/drb-c2-core/mosquitto/mosquitto.conf b/drb-c2-core/mosquitto/mosquitto.conf new file mode 100644 index 0000000..b5d4eb0 --- /dev/null +++ b/drb-c2-core/mosquitto/mosquitto.conf @@ -0,0 +1,8 @@ +listener 1883 +allow_anonymous true + +# Persist messages across restarts +persistence true +persistence_location /mosquitto/data/ + +log_dest stdout diff --git a/drb-c2-core/pytest.ini b/drb-c2-core/pytest.ini new file mode 100644 index 0000000..78c5011 --- /dev/null +++ b/drb-c2-core/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +asyncio_mode = auto +testpaths = tests diff --git a/drb-c2-core/requirements.txt b/drb-c2-core/requirements.txt new file mode 100644 index 0000000..7d33a3c --- /dev/null +++ b/drb-c2-core/requirements.txt @@ -0,0 +1,9 @@ +fastapi +uvicorn[standard] +pydantic-settings +paho-mqtt>=2.0.0 +firebase-admin +google-cloud-storage +python-multipart +pytest +pytest-asyncio diff --git a/drb-c2-core/scripts/set_admin.py b/drb-c2-core/scripts/set_admin.py new file mode 100644 index 0000000..79c3833 --- /dev/null +++ b/drb-c2-core/scripts/set_admin.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +""" +Set or remove the 'admin' custom claim on a Firebase user. + +Usage (run from drb-c2-core directory): + python scripts/set_admin.py grant user@example.com + python scripts/set_admin.py revoke user@example.com + +Requires GCP_CREDENTIALS_PATH or Application Default Credentials. +The user must sign out and back in (or wait up to 1 hour) for the +new claim to take effect in their ID token. +""" +import sys +import os +import firebase_admin +from firebase_admin import credentials, auth + +def main(): + if len(sys.argv) != 3 or sys.argv[1] not in ("grant", "revoke"): + print(__doc__) + sys.exit(1) + + action, email = sys.argv[1], sys.argv[2] + + creds_path = os.getenv("GCP_CREDENTIALS_PATH", "gcp-key.json") + cred = credentials.Certificate(creds_path) + firebase_admin.initialize_app(cred) + + try: + user = auth.get_user_by_email(email) + except auth.UserNotFoundError: + print(f"No Firebase user found for {email!r}") + sys.exit(1) + + existing = user.custom_claims or {} + if action == "grant": + updated = {**existing, "admin": True} + auth.set_custom_user_claims(user.uid, updated) + print(f"Admin granted to {email} ({user.uid})") + else: + updated = {k: v for k, v in existing.items() if k != "admin"} + auth.set_custom_user_claims(user.uid, updated) + print(f"Admin revoked from {email} ({user.uid})") + + print("The user must sign out and back in for the change to take effect.") + +if __name__ == "__main__": + main() diff --git a/drb-c2-core/tests/__init__.py b/drb-c2-core/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/drb-c2-core/tests/conftest.py b/drb-c2-core/tests/conftest.py new file mode 100644 index 0000000..b923308 --- /dev/null +++ b/drb-c2-core/tests/conftest.py @@ -0,0 +1,2 @@ +# All C2 core settings have defaults — no env setup needed. +# Add any shared fixtures here if required in the future. diff --git a/drb-c2-core/tests/test_mqtt_handler.py b/drb-c2-core/tests/test_mqtt_handler.py new file mode 100644 index 0000000..e2fc6f9 --- /dev/null +++ b/drb-c2-core/tests/test_mqtt_handler.py @@ -0,0 +1,286 @@ +""" +Unit tests for MQTTHandler — topic dispatch and Firestore write logic. +Firestore is mocked throughout; no MQTT broker or DB required. +""" +import pytest +from unittest.mock import AsyncMock, patch, call +from app.internal.mqtt_handler import MQTTHandler + + +@pytest.fixture +def handler(): + return MQTTHandler() + + +# --------------------------------------------------------------------------- +# Topic dispatch +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_dispatch_routes_checkin(handler): + with patch.object(handler, "_handle_checkin", new=AsyncMock()) as m: + await handler._dispatch("nodes/node-01/checkin", {"name": "Pi"}) + m.assert_called_once_with("node-01", {"name": "Pi"}) + + +@pytest.mark.asyncio +async def test_dispatch_routes_status(handler): + with patch.object(handler, "_handle_status", new=AsyncMock()) as m: + await handler._dispatch("nodes/node-01/status", {"status": "online"}) + m.assert_called_once_with("node-01", {"status": "online"}) + + +@pytest.mark.asyncio +async def test_dispatch_routes_metadata(handler): + with patch.object(handler, "_handle_metadata", new=AsyncMock()) as m: + await handler._dispatch("nodes/node-01/metadata", {"event": "call_start"}) + m.assert_called_once_with("node-01", {"event": "call_start"}) + + +@pytest.mark.asyncio +async def test_dispatch_ignores_wrong_prefix(handler): + with patch.object(handler, "_handle_checkin", new=AsyncMock()) as m: + await handler._dispatch("other/topic/here", {}) + m.assert_not_called() + + +@pytest.mark.asyncio +async def test_dispatch_ignores_malformed_topic(handler): + with patch.object(handler, "_handle_checkin", new=AsyncMock()) as m: + await handler._dispatch("nodes/checkin", {}) # only 2 parts + m.assert_not_called() + + +# --------------------------------------------------------------------------- +# Checkin — new node +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_checkin_creates_new_node(handler): + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_get = AsyncMock(return_value=None) + mock_fstore.doc_set = AsyncMock() + + await handler._handle_checkin( + "new-node", + {"name": "Pi Zero W", "lat": 40.7, "lon": -74.0}, + ) + + mock_fstore.doc_set.assert_called_once() + _, _, doc, _ = mock_fstore.doc_set.call_args[0] + assert doc["node_id"] == "new-node" + assert doc["name"] == "Pi Zero W" + assert doc["status"] == "unconfigured" + assert doc["configured"] is False + assert doc["lat"] == 40.7 + + +@pytest.mark.asyncio +async def test_checkin_new_node_defaults_lat_lon(handler): + """Missing lat/lon in payload should default to 0.0.""" + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_get = AsyncMock(return_value=None) + mock_fstore.doc_set = AsyncMock() + + await handler._handle_checkin("new-node", {}) + + _, _, doc, _ = mock_fstore.doc_set.call_args[0] + assert doc["lat"] == 0.0 + assert doc["lon"] == 0.0 + + +# --------------------------------------------------------------------------- +# Checkin — existing node +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_checkin_updates_existing_configured_node(handler): + existing = { + "node_id": "node-01", + "name": "Old Name", + "lat": 0.0, + "lon": 0.0, + "status": "online", + "configured": True, + } + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_get = AsyncMock(return_value=existing) + mock_fstore.doc_update = AsyncMock() + + await handler._handle_checkin("node-01", {"name": "New Name", "lat": 1.1, "lon": 2.2}) + + updates = mock_fstore.doc_update.call_args[0][2] + assert updates["name"] == "New Name" + assert updates["lat"] == 1.1 + assert updates["status"] == "online" + assert "last_seen" in updates + + +@pytest.mark.asyncio +async def test_checkin_does_not_promote_unconfigured_to_online(handler): + existing = { + "node_id": "node-02", + "name": "Node", + "lat": 0.0, + "lon": 0.0, + "status": "unconfigured", + "configured": False, + } + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_get = AsyncMock(return_value=existing) + mock_fstore.doc_update = AsyncMock() + + await handler._handle_checkin("node-02", {}) + + updates = mock_fstore.doc_update.call_args[0][2] + assert "status" not in updates + + +@pytest.mark.asyncio +async def test_checkin_does_not_override_recording_status(handler): + existing = { + "node_id": "node-03", + "name": "Node", + "lat": 0.0, + "lon": 0.0, + "status": "recording", + "configured": True, + } + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_get = AsyncMock(return_value=existing) + mock_fstore.doc_update = AsyncMock() + + await handler._handle_checkin("node-03", {}) + + updates = mock_fstore.doc_update.call_args[0][2] + assert "status" not in updates + + +# --------------------------------------------------------------------------- +# Status update +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_handle_status_updates_firestore(handler): + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + + await handler._handle_status("node-01", {"status": "recording"}) + + updates = mock_fstore.doc_update.call_args[0][2] + assert updates["status"] == "recording" + assert "last_seen" in updates + + +@pytest.mark.asyncio +async def test_handle_status_ignores_empty_payload(handler): + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + + await handler._handle_status("node-01", {}) + + mock_fstore.doc_update.assert_not_called() + + +# --------------------------------------------------------------------------- +# Metadata — call_start / call_end +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_call_start_creates_call_doc(handler): + node = {"node_id": "node-01", "assigned_system_id": "sys-001"} + payload = { + "event": "call_start", + "call_id": "call-abc123", + "tgid": 1234, + "tgid_name": "Police Dispatch", + "started_at": "2026-01-01T00:00:00+00:00", + "freq": 851012500, + "srcaddr": 42, + } + + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_get = AsyncMock(return_value=node) + mock_fstore.doc_set = AsyncMock() + + await handler._on_call_start("node-01", payload) + + mock_fstore.doc_set.assert_called_once() + _, _, doc, _ = mock_fstore.doc_set.call_args[0] + assert doc["call_id"] == "call-abc123" + assert doc["node_id"] == "node-01" + assert doc["system_id"] == "sys-001" + assert doc["talkgroup_id"] == 1234 + assert doc["talkgroup_name"] == "Police Dispatch" + assert doc["status"] == "active" + assert doc["audio_url"] is None + + +@pytest.mark.asyncio +async def test_call_start_handles_missing_call_id(handler): + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_get = AsyncMock(return_value={}) + mock_fstore.doc_set = AsyncMock() + + await handler._on_call_start("node-01", {"event": "call_start"}) + + mock_fstore.doc_set.assert_not_called() + + +@pytest.mark.asyncio +async def test_call_start_uses_now_when_started_at_missing(handler): + node = {"node_id": "node-01", "assigned_system_id": None} + payload = {"call_id": "call-xyz", "tgid": 99} + + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_get = AsyncMock(return_value=node) + mock_fstore.doc_set = AsyncMock() + + await handler._on_call_start("node-01", payload) + + _, _, doc, _ = mock_fstore.doc_set.call_args[0] + assert doc["started_at"] is not None + + +@pytest.mark.asyncio +async def test_call_end_updates_status_and_times(handler): + payload = { + "call_id": "call-abc123", + "ended_at": "2026-01-01T00:05:00+00:00", + } + + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + + await handler._on_call_end("node-01", payload) + + updates = mock_fstore.doc_update.call_args[0][2] + assert updates["status"] == "ended" + assert updates["ended_at"] is not None + + +@pytest.mark.asyncio +async def test_call_end_sets_audio_url_when_present(handler): + payload = { + "call_id": "call-abc123", + "ended_at": "2026-01-01T00:05:00+00:00", + "audio_url": "https://storage.example.com/call.mp3", + } + + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + + await handler._on_call_end("node-01", payload) + + updates = mock_fstore.doc_update.call_args[0][2] + assert updates["audio_url"] == "https://storage.example.com/call.mp3" + + +@pytest.mark.asyncio +async def test_call_end_ignores_missing_call_id(handler): + with patch("app.internal.mqtt_handler.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + + await handler._on_call_end("node-01", {"ended_at": "2026-01-01T00:05:00+00:00"}) + + mock_fstore.doc_update.assert_not_called() diff --git a/drb-c2-core/tests/test_node_sweeper.py b/drb-c2-core/tests/test_node_sweeper.py new file mode 100644 index 0000000..caeab98 --- /dev/null +++ b/drb-c2-core/tests/test_node_sweeper.py @@ -0,0 +1,150 @@ +""" +Unit tests for node_sweeper — datetime comparison logic and Firestore update gating. +Firestore and asyncio.to_thread are mocked — no real DB required. +""" +import pytest +import asyncio +from datetime import datetime, timezone, timedelta +from unittest.mock import AsyncMock, patch, MagicMock +from app.internal.node_sweeper import _sweep + + +def _node(node_id, status, age_seconds): + """Helper: build a node dict with last_seen age_seconds ago (tz-aware).""" + return { + "node_id": node_id, + "status": status, + "last_seen": datetime.now(timezone.utc) - timedelta(seconds=age_seconds), + } + + +def _node_naive(node_id, status, age_seconds): + """Helper: tz-naive last_seen (simulates some Firestore SDK versions).""" + return { + "node_id": node_id, + "status": status, + "last_seen": datetime.utcnow() - timedelta(seconds=age_seconds), + } + + +# --------------------------------------------------------------------------- +# Core sweep logic +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_stale_online_node_marked_offline(): + nodes = [_node("node-01", "online", age_seconds=120)] + + with patch("asyncio.to_thread", new=AsyncMock(return_value=nodes)), \ + patch("app.internal.node_sweeper.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + await _sweep() + + mock_fstore.doc_update.assert_called_once_with( + "nodes", "node-01", {"status": "offline"} + ) + + +@pytest.mark.asyncio +async def test_stale_recording_node_marked_offline(): + nodes = [_node("node-02", "recording", age_seconds=200)] + + with patch("asyncio.to_thread", new=AsyncMock(return_value=nodes)), \ + patch("app.internal.node_sweeper.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + await _sweep() + + mock_fstore.doc_update.assert_called_once_with( + "nodes", "node-02", {"status": "offline"} + ) + + +@pytest.mark.asyncio +async def test_fresh_node_not_touched(): + nodes = [_node("node-03", "online", age_seconds=10)] + + with patch("asyncio.to_thread", new=AsyncMock(return_value=nodes)), \ + patch("app.internal.node_sweeper.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + await _sweep() + + mock_fstore.doc_update.assert_not_called() + + +@pytest.mark.asyncio +async def test_already_offline_node_skipped(): + """Offline nodes must be skipped even if last_seen is ancient.""" + nodes = [_node("node-04", "offline", age_seconds=9999)] + + with patch("asyncio.to_thread", new=AsyncMock(return_value=nodes)), \ + patch("app.internal.node_sweeper.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + await _sweep() + + mock_fstore.doc_update.assert_not_called() + + +@pytest.mark.asyncio +async def test_node_with_no_last_seen_skipped(): + nodes = [{"node_id": "node-05", "status": "online", "last_seen": None}] + + with patch("asyncio.to_thread", new=AsyncMock(return_value=nodes)), \ + patch("app.internal.node_sweeper.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + await _sweep() + + mock_fstore.doc_update.assert_not_called() + + +# --------------------------------------------------------------------------- +# Timezone edge cases +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_tz_naive_last_seen_is_handled(): + """Firestore may return tz-naive datetimes; sweeper must not crash.""" + nodes = [_node_naive("node-06", "online", age_seconds=120)] + + with patch("asyncio.to_thread", new=AsyncMock(return_value=nodes)), \ + patch("app.internal.node_sweeper.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + await _sweep() + + mock_fstore.doc_update.assert_called_once_with( + "nodes", "node-06", {"status": "offline"} + ) + + +@pytest.mark.asyncio +async def test_tz_naive_fresh_node_not_touched(): + nodes = [_node_naive("node-07", "online", age_seconds=5)] + + with patch("asyncio.to_thread", new=AsyncMock(return_value=nodes)), \ + patch("app.internal.node_sweeper.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + await _sweep() + + mock_fstore.doc_update.assert_not_called() + + +# --------------------------------------------------------------------------- +# Batch behaviour +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_only_stale_nodes_updated_in_batch(): + nodes = [ + _node("node-08", "online", age_seconds=200), # stale → offline + _node("node-09", "online", age_seconds=5), # fresh → skip + _node("node-10", "offline", age_seconds=500), # already offline → skip + _node("node-11", "recording", age_seconds=150), # stale recording → offline + ] + + with patch("asyncio.to_thread", new=AsyncMock(return_value=nodes)), \ + patch("app.internal.node_sweeper.fstore") as mock_fstore: + mock_fstore.doc_update = AsyncMock() + await _sweep() + + assert mock_fstore.doc_update.call_count == 2 + updated_ids = {call.args[1] for call in mock_fstore.doc_update.call_args_list} + assert updated_ids == {"node-08", "node-11"} diff --git a/drb-frontend/.dockerignore b/drb-frontend/.dockerignore new file mode 100644 index 0000000..ba2800b --- /dev/null +++ b/drb-frontend/.dockerignore @@ -0,0 +1,4 @@ +node_modules +.next +.git +*.log diff --git a/drb-frontend/.env.example b/drb-frontend/.env.example new file mode 100644 index 0000000..329e3a3 --- /dev/null +++ b/drb-frontend/.env.example @@ -0,0 +1,12 @@ +# Firebase — get these from your Firebase project settings +NEXT_PUBLIC_FIREBASE_API_KEY= +NEXT_PUBLIC_FIREBASE_AUTH_DOMAIN= +NEXT_PUBLIC_FIREBASE_PROJECT_ID= +NEXT_PUBLIC_FIREBASE_STORAGE_BUCKET= +NEXT_PUBLIC_FIREBASE_MESSAGING_SENDER_ID= +NEXT_PUBLIC_FIREBASE_APP_ID= +# Named Firestore database (omit or set to "(default)" if using the default database) +NEXT_PUBLIC_FIRESTORE_DATABASE= + +# C2 API — must be reachable from the browser (or a server-side proxy) +NEXT_PUBLIC_C2_URL=http://localhost:8888 diff --git a/drb-frontend/Dockerfile b/drb-frontend/Dockerfile new file mode 100644 index 0000000..e80dcbd --- /dev/null +++ b/drb-frontend/Dockerfile @@ -0,0 +1,17 @@ +FROM node:20-slim AS builder + +WORKDIR /app +COPY package.json ./ +RUN npm install +COPY . . +RUN npm run build + +FROM node:20-slim AS runner +WORKDIR /app +ENV NODE_ENV=production + +COPY --from=builder /app/.next/standalone ./ +COPY --from=builder /app/.next/static ./.next/static + +EXPOSE 3000 +CMD ["node", "server.js"] diff --git a/drb-frontend/app/calls/page.tsx b/drb-frontend/app/calls/page.tsx new file mode 100644 index 0000000..69c2906 --- /dev/null +++ b/drb-frontend/app/calls/page.tsx @@ -0,0 +1,93 @@ +"use client"; + +import { useState } from "react"; +import { useCalls } from "@/lib/useCalls"; +import { useSystems } from "@/lib/useSystems"; +import { CallRow } from "@/components/CallRow"; + +export default function CallsPage() { + const [limitCount, setLimitCount] = useState(100); + const { calls, loading } = useCalls(limitCount); + const { systems } = useSystems(); + const systemMap = Object.fromEntries(systems.map((s) => [s.system_id, s])); + + const active = calls.filter((c) => c.status === "active"); + const ended = calls.filter((c) => c.status === "ended"); + + return ( +
+
+

Calls

+ {calls.length} loaded +
+ + {active.length > 0 && ( +
+

+ Live ({active.length}) +

+
+ + + + + + + + + + + + + {active.map((c) => ( + + ))} + +
TimeTalkgroupSystemNodeDurationAudio
+
+
+ )} + +
+

+ History +

+ {loading ? ( +

Loading…

+ ) : ended.length === 0 ? ( +

No calls recorded yet.

+ ) : ( + <> +
+ + + + + + + + + + + + + {ended.map((c) => ( + + ))} + +
TimeTalkgroupSystemNodeDurationAudio
+
+ {ended.length >= limitCount && ( + + )} + + )} +
+
+ ); +} diff --git a/drb-frontend/app/dashboard/page.tsx b/drb-frontend/app/dashboard/page.tsx new file mode 100644 index 0000000..416202a --- /dev/null +++ b/drb-frontend/app/dashboard/page.tsx @@ -0,0 +1,118 @@ +"use client"; + +import { useNodes, useUnconfiguredNodes } from "@/lib/useNodes"; +import { useCalls, useActiveCalls } from "@/lib/useCalls"; +import { useSystems } from "@/lib/useSystems"; +import { NodeCard } from "@/components/NodeCard"; +import { CallRow } from "@/components/CallRow"; +import { NodeConfigModal } from "@/components/NodeConfigModal"; +import { useState } from "react"; +import type { NodeRecord } from "@/lib/types"; + +function StatCard({ label, value, accent }: { label: string; value: string | number; accent?: string }) { + return ( +
+

{label}

+

{value}

+
+ ); +} + +export default function DashboardPage() { + const { nodes, error: nodesError } = useNodes(); + const { nodes: pending } = useUnconfiguredNodes(); + const { calls, error: callsError } = useCalls(20); + const activeCalls = useActiveCalls(); + const { systems, error: systemsError } = useSystems(); + const [configNode, setConfigNode] = useState(null); + + const systemMap = Object.fromEntries(systems.map((s) => [s.system_id, s])); + const onlineCount = nodes.filter((n) => n.status !== "offline").length; + + const fsError = nodesError ?? callsError ?? systemsError; + + return ( +
+

Dashboard

+ + {fsError && ( +
+

Firestore error: {fsError}

+
+ )} + + {/* Pending config banner */} + {pending.length > 0 && ( +
+

+ {pending.length} new node{pending.length > 1 ? "s" : ""} connected and need{pending.length === 1 ? "s" : ""} configuration. +

+ +
+ )} + + {/* Stats */} +
+ + 0 ? "text-orange-400" : undefined} /> + + +
+ + {/* Nodes */} +
+

Nodes

+ {nodes.length === 0 ? ( +

No nodes registered yet.

+ ) : ( +
+ {nodes.map((n) => ( + + ))} +
+ )} +
+ + {/* Recent calls */} +
+

Recent Calls

+ {calls.length === 0 ? ( +

No calls recorded yet.

+ ) : ( +
+ + + + + + + + + + + + + {calls.map((c) => ( + + ))} + +
TimeTalkgroupSystemNodeDurationAudio
+
+ )} +
+ + {configNode && ( + setConfigNode(null)} + /> + )} +
+ ); +} diff --git a/drb-frontend/app/globals.css b/drb-frontend/app/globals.css new file mode 100644 index 0000000..aad9296 --- /dev/null +++ b/drb-frontend/app/globals.css @@ -0,0 +1,7 @@ +@tailwind base; +@tailwind components; +@tailwind utilities; + +html, body { + @apply bg-gray-950 text-gray-100 font-mono; +} diff --git a/drb-frontend/app/layout.tsx b/drb-frontend/app/layout.tsx new file mode 100644 index 0000000..29cdbf8 --- /dev/null +++ b/drb-frontend/app/layout.tsx @@ -0,0 +1,22 @@ +import type { Metadata } from "next"; +import { Nav } from "@/components/Nav"; +import { AuthProvider } from "@/components/AuthProvider"; +import "./globals.css"; + +export const metadata: Metadata = { + title: "DRB Portal", + description: "Distributed Radio Bot — Control & Monitoring", +}; + +export default function RootLayout({ children }: { children: React.ReactNode }) { + return ( + + + +