diff --git a/drb-c2-core/app/internal/feature_flags.py b/drb-c2-core/app/internal/feature_flags.py new file mode 100644 index 0000000..9fcb7e1 --- /dev/null +++ b/drb-c2-core/app/internal/feature_flags.py @@ -0,0 +1,62 @@ +""" +Global AI feature flags stored in Firestore at config/ai_features. + +Defaults to all-on when the document does not exist yet. Uses a short +in-memory TTL cache so flag reads don't add a Firestore round-trip to every +call upload. +""" +import time +from typing import Any +from app.internal.logger import logger +from app.internal import firestore as fstore + +_COLLECTION = "config" +_DOC_ID = "ai_features" +_TTL = 30.0 # seconds before re-reading from Firestore + +_DEFAULTS: dict[str, bool] = { + "stt_enabled": True, + "correlation_enabled": True, + "summaries_enabled": True, + "vocabulary_learning_enabled": True, +} + +_cache: dict[str, Any] = {} +_cache_ts: float = 0.0 + + +async def get_flags() -> dict[str, bool]: + """Return the current feature flags, using the TTL cache when fresh.""" + global _cache, _cache_ts + + now = time.monotonic() + if _cache and (now - _cache_ts) < _TTL: + return dict(_cache) + + try: + doc = await fstore.doc_get(_COLLECTION, _DOC_ID) + if doc: + merged = {**_DEFAULTS, **{k: bool(v) for k, v in doc.items() if k in _DEFAULTS}} + else: + merged = dict(_DEFAULTS) + except Exception as e: + logger.warning(f"Feature flags: could not read from Firestore ({e}), using defaults") + merged = dict(_DEFAULTS) + + _cache = merged + _cache_ts = now + return dict(_cache) + + +async def set_flags(updates: dict[str, bool]) -> dict[str, bool]: + """Write flag updates to Firestore and invalidate the cache.""" + global _cache, _cache_ts + + clean = {k: bool(v) for k, v in updates.items() if k in _DEFAULTS} + if not clean: + raise ValueError(f"No recognised flag keys in update: {list(updates)}") + + await fstore.doc_set(_COLLECTION, _DOC_ID, clean) + _cache_ts = 0.0 # force re-read on next get_flags() + logger.info(f"Feature flags updated: {clean}") + return await get_flags() diff --git a/drb-c2-core/app/internal/summarizer.py b/drb-c2-core/app/internal/summarizer.py index 46b61a2..8d2359b 100644 --- a/drb-c2-core/app/internal/summarizer.py +++ b/drb-c2-core/app/internal/summarizer.py @@ -16,13 +16,18 @@ from app.config import settings async def summarizer_loop() -> None: + from app.internal.feature_flags import get_flags interval = settings.summary_interval_minutes * 60 logger.info(f"Summarizer started — interval: {settings.summary_interval_minutes}m") while True: await asyncio.sleep(interval) try: - await _run_summary_pass() - await _resolve_stale_incidents() + flags = await get_flags() + if flags["summaries_enabled"]: + await _run_summary_pass() + await _resolve_stale_incidents() + else: + logger.info("Summaries disabled — skipping summary pass and stale incident sweep") except Exception as e: logger.error(f"Summarizer pass failed: {e}") diff --git a/drb-c2-core/app/internal/vocabulary_learner.py b/drb-c2-core/app/internal/vocabulary_learner.py index 2a71ba3..aa7e3d1 100644 --- a/drb-c2-core/app/internal/vocabulary_learner.py +++ b/drb-c2-core/app/internal/vocabulary_learner.py @@ -243,6 +243,7 @@ def build_gpt_vocab_block(vocabulary: list[str]) -> str: # ───────────────────────────────────────────────────────────────────────────── async def vocabulary_induction_loop() -> None: + from app.internal.feature_flags import get_flags interval = settings.vocabulary_induction_interval_hours * 3600 logger.info( f"Vocabulary induction loop started — " @@ -252,7 +253,11 @@ async def vocabulary_induction_loop() -> None: while True: await asyncio.sleep(interval) try: - await _run_induction_pass() + flags = await get_flags() + if flags["vocabulary_learning_enabled"]: + await _run_induction_pass() + else: + logger.info("Vocabulary learning disabled — skipping induction pass") except Exception as e: logger.error(f"Vocabulary induction pass failed: {e}") diff --git a/drb-c2-core/app/main.py b/drb-c2-core/app/main.py index 37471d6..e286db1 100644 --- a/drb-c2-core/app/main.py +++ b/drb-c2-core/app/main.py @@ -10,7 +10,7 @@ from app.internal.vocabulary_learner import vocabulary_induction_loop from app.internal.recorrelation_sweep import recorrelation_loop from app.config import settings from app.internal.auth import require_firebase_token, require_service_or_firebase_token -from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts +from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts, admin from app.internal import firestore as fstore @@ -69,6 +69,7 @@ app.include_router(tokens.router, dependencies=[Depends(require_service_or_fi app.include_router(incidents.router, dependencies=[Depends(require_service_or_firebase_token)]) app.include_router(alerts.router, dependencies=[Depends(require_service_or_firebase_token)]) app.include_router(upload.router) # auth is per-node, handled inline +app.include_router(admin.router) # auth is per-endpoint (read: firebase, write: admin) @app.get("/health") diff --git a/drb-c2-core/app/routers/admin.py b/drb-c2-core/app/routers/admin.py new file mode 100644 index 0000000..6cd2651 --- /dev/null +++ b/drb-c2-core/app/routers/admin.py @@ -0,0 +1,17 @@ +from fastapi import APIRouter, Depends +from app.internal.auth import require_admin_token, require_firebase_token +from app.internal.feature_flags import get_flags, set_flags + +router = APIRouter(prefix="/admin", tags=["admin"]) + + +@router.get("/features") +async def get_feature_flags(_=Depends(require_firebase_token)): + """Return the current AI feature flag state. Any authenticated user can read.""" + return await get_flags() + + +@router.put("/features") +async def update_feature_flags(body: dict, _=Depends(require_admin_token)): + """Update one or more AI feature flags. Admin only.""" + return await set_flags(body) diff --git a/drb-c2-core/app/routers/upload.py b/drb-c2-core/app/routers/upload.py index 790e727..bf82fbb 100644 --- a/drb-c2-core/app/routers/upload.py +++ b/drb-c2-core/app/routers/upload.py @@ -157,64 +157,74 @@ async def _run_intelligence_pipeline( 4. Check alert rules and dispatch notifications """ from app.internal import transcription, intelligence, incident_correlator, alerter + from app.internal.feature_flags import get_flags + + flags = await get_flags() transcript: Optional[str] = None segments: list[dict] = [] # Step 1: Transcription if gcs_uri: - transcript, segments = await transcription.transcribe_call( - call_id, gcs_uri, talkgroup_name, system_id=system_id - ) + if flags["stt_enabled"]: + transcript, segments = await transcription.transcribe_call( + call_id, gcs_uri, talkgroup_name, system_id=system_id + ) + else: + logger.info(f"STT disabled — skipping transcription for call {call_id}") # Step 2: Scene detection + intelligence extraction scenes: list[dict] = [] - if transcript: - scenes = await intelligence.extract_scenes( - call_id, transcript, talkgroup_name, - talkgroup_id=talkgroup_id, system_id=system_id, segments=segments, - node_id=node_id, - ) + if flags["correlation_enabled"]: + if transcript: + scenes = await intelligence.extract_scenes( + call_id, transcript, talkgroup_name, + talkgroup_id=talkgroup_id, system_id=system_id, segments=segments, + node_id=node_id, + ) + else: + logger.info(f"Correlation disabled — skipping scene extraction and correlation for call {call_id}") # Step 3: Correlate each scene independently. # A single recording can produce multiple incidents on a busy channel. incident_ids: list[str] = [] all_tags: list[str] = [] - for scene in scenes: - all_tags.extend(scene["tags"]) - incident_id = await incident_correlator.correlate_call( - call_id=call_id, - node_id=node_id, - system_id=system_id, - talkgroup_id=talkgroup_id, - talkgroup_name=talkgroup_name, - tags=scene["tags"], - incident_type=scene["incident_type"], - location=scene["location"], - location_coords=scene["location_coords"], - ) - if incident_id and incident_id not in incident_ids: - incident_ids.append(incident_id) - if scene["resolved"] and incident_id: - await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) - logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") + if flags["correlation_enabled"]: + for scene in scenes: + all_tags.extend(scene["tags"]) + incident_id = await incident_correlator.correlate_call( + call_id=call_id, + node_id=node_id, + system_id=system_id, + talkgroup_id=talkgroup_id, + talkgroup_name=talkgroup_name, + tags=scene["tags"], + incident_type=scene["incident_type"], + location=scene["location"], + location_coords=scene["location_coords"], + ) + if incident_id and incident_id not in incident_ids: + incident_ids.append(incident_id) + if scene["resolved"] and incident_id: + await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) + logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") - # Correlator also runs for calls with no scenes (unclassified) to attempt - # talkgroup-based linking even when no transcript could be produced. - if not scenes: - incident_id = await incident_correlator.correlate_call( - call_id=call_id, - node_id=node_id, - system_id=system_id, - talkgroup_id=talkgroup_id, - talkgroup_name=talkgroup_name, - tags=[], - incident_type=None, - location=None, - location_coords=None, - ) - if incident_id: - incident_ids.append(incident_id) + # Correlator also runs for calls with no scenes (unclassified) to attempt + # talkgroup-based linking even when no transcript could be produced. + if not scenes: + incident_id = await incident_correlator.correlate_call( + call_id=call_id, + node_id=node_id, + system_id=system_id, + talkgroup_id=talkgroup_id, + talkgroup_name=talkgroup_name, + tags=[], + incident_type=None, + location=None, + location_coords=None, + ) + if incident_id: + incident_ids.append(incident_id) if incident_ids: await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids}) diff --git a/drb-frontend/app/admin/page.tsx b/drb-frontend/app/admin/page.tsx new file mode 100644 index 0000000..719f8e5 --- /dev/null +++ b/drb-frontend/app/admin/page.tsx @@ -0,0 +1,135 @@ +"use client"; + +import { useAuth } from "@/components/AuthProvider"; +import { c2api } from "@/lib/c2api"; +import { useEffect, useState } from "react"; +import { useRouter } from "next/navigation"; + +interface FeatureFlags { + stt_enabled: boolean; + correlation_enabled: boolean; + summaries_enabled: boolean; + vocabulary_learning_enabled: boolean; +} + +const FLAG_META: { key: keyof FeatureFlags; label: string; description: string }[] = [ + { + key: "stt_enabled", + label: "Speech-to-Text (Whisper)", + description: "Transcribe call audio via OpenAI Whisper. When off, calls are recorded and stored but no transcript is generated.", + }, + { + key: "correlation_enabled", + label: "Incident Correlation", + description: "Run scene extraction and incident correlation on each call. When off, calls are logged but not linked to incidents.", + }, + { + key: "summaries_enabled", + label: "Incident Summaries", + description: "Generate AI summaries for active incidents on each summarizer pass. Auto-resolve sweep is also paused when off.", + }, + { + key: "vocabulary_learning_enabled", + label: "Vocabulary Learning", + description: "Run the background vocabulary induction loop that proposes new STT terms from recent transcripts.", + }, +]; + +function Toggle({ + enabled, + onChange, + disabled, +}: { + enabled: boolean; + onChange: (val: boolean) => void; + disabled: boolean; +}) { + return ( + + ); +} + +export default function AdminPage() { + const { isAdmin } = useAuth(); + const router = useRouter(); + + const [flags, setFlags] = useState(null); + const [loading, setLoading] = useState(true); + const [saving, setSaving] = useState(null); + const [error, setError] = useState(null); + + useEffect(() => { + if (!isAdmin) { + router.replace("/dashboard"); + return; + } + c2api.getFeatureFlags() + .then((f) => setFlags(f as FeatureFlags)) + .catch((e) => setError(String(e))) + .finally(() => setLoading(false)); + }, [isAdmin, router]); + + async function handleToggle(key: keyof FeatureFlags, value: boolean) { + if (!flags) return; + setSaving(key); + setError(null); + try { + const updated = await c2api.setFeatureFlags({ [key]: value }); + setFlags(updated as FeatureFlags); + } catch (e) { + setError(String(e)); + } finally { + setSaving(null); + } + } + + if (!isAdmin) return null; + + return ( +
+

Admin

+ +
+

AI Features

+ + {error && ( +
+

{error}

+
+ )} + + {loading ? ( +

Loading…

+ ) : ( +
+ {FLAG_META.map(({ key, label, description }) => ( +
+
+

{label}

+

{description}

+
+ handleToggle(key, val)} + disabled={saving === key} + /> +
+ ))} +
+ )} +
+
+ ); +} diff --git a/drb-frontend/components/Nav.tsx b/drb-frontend/components/Nav.tsx index aa97aa6..434dfbf 100644 --- a/drb-frontend/components/Nav.tsx +++ b/drb-frontend/components/Nav.tsx @@ -18,6 +18,7 @@ const links = [ const adminLinks = [ { href: "/tokens", label: "Tokens" }, + { href: "/admin", label: "Admin" }, ]; export function Nav() { diff --git a/drb-frontend/lib/c2api.ts b/drb-frontend/lib/c2api.ts index 0456758..cddf9a0 100644 --- a/drb-frontend/lib/c2api.ts +++ b/drb-frontend/lib/c2api.ts @@ -115,4 +115,10 @@ export const c2api = { request(`/systems/${systemId}/vocabulary/pending/approve`, { method: "POST", body: JSON.stringify({ term }) }), dismissPendingTerm: (systemId: string, term: string) => request(`/systems/${systemId}/vocabulary/pending/dismiss`, { method: "POST", body: JSON.stringify({ term }) }), + + // Feature flags (admin) + getFeatureFlags: () => + request>("/admin/features"), + setFeatureFlags: (flags: Record) => + request>("/admin/features", { method: "PUT", body: JSON.stringify(flags) }), };