Implement Admin UI to disable AI components

This commit is contained in:
Logan
2026-04-27 00:37:51 -04:00
parent 92c8351864
commit c959437059
9 changed files with 289 additions and 47 deletions
+62
View File
@@ -0,0 +1,62 @@
"""
Global AI feature flags stored in Firestore at config/ai_features.
Defaults to all-on when the document does not exist yet. Uses a short
in-memory TTL cache so flag reads don't add a Firestore round-trip to every
call upload.
"""
import time
from typing import Any
from app.internal.logger import logger
from app.internal import firestore as fstore
_COLLECTION = "config"
_DOC_ID = "ai_features"
_TTL = 30.0 # seconds before re-reading from Firestore
_DEFAULTS: dict[str, bool] = {
"stt_enabled": True,
"correlation_enabled": True,
"summaries_enabled": True,
"vocabulary_learning_enabled": True,
}
_cache: dict[str, Any] = {}
_cache_ts: float = 0.0
async def get_flags() -> dict[str, bool]:
"""Return the current feature flags, using the TTL cache when fresh."""
global _cache, _cache_ts
now = time.monotonic()
if _cache and (now - _cache_ts) < _TTL:
return dict(_cache)
try:
doc = await fstore.doc_get(_COLLECTION, _DOC_ID)
if doc:
merged = {**_DEFAULTS, **{k: bool(v) for k, v in doc.items() if k in _DEFAULTS}}
else:
merged = dict(_DEFAULTS)
except Exception as e:
logger.warning(f"Feature flags: could not read from Firestore ({e}), using defaults")
merged = dict(_DEFAULTS)
_cache = merged
_cache_ts = now
return dict(_cache)
async def set_flags(updates: dict[str, bool]) -> dict[str, bool]:
"""Write flag updates to Firestore and invalidate the cache."""
global _cache, _cache_ts
clean = {k: bool(v) for k, v in updates.items() if k in _DEFAULTS}
if not clean:
raise ValueError(f"No recognised flag keys in update: {list(updates)}")
await fstore.doc_set(_COLLECTION, _DOC_ID, clean)
_cache_ts = 0.0 # force re-read on next get_flags()
logger.info(f"Feature flags updated: {clean}")
return await get_flags()
+7 -2
View File
@@ -16,13 +16,18 @@ from app.config import settings
async def summarizer_loop() -> None:
from app.internal.feature_flags import get_flags
interval = settings.summary_interval_minutes * 60
logger.info(f"Summarizer started — interval: {settings.summary_interval_minutes}m")
while True:
await asyncio.sleep(interval)
try:
await _run_summary_pass()
await _resolve_stale_incidents()
flags = await get_flags()
if flags["summaries_enabled"]:
await _run_summary_pass()
await _resolve_stale_incidents()
else:
logger.info("Summaries disabled — skipping summary pass and stale incident sweep")
except Exception as e:
logger.error(f"Summarizer pass failed: {e}")
@@ -243,6 +243,7 @@ def build_gpt_vocab_block(vocabulary: list[str]) -> str:
# ─────────────────────────────────────────────────────────────────────────────
async def vocabulary_induction_loop() -> None:
from app.internal.feature_flags import get_flags
interval = settings.vocabulary_induction_interval_hours * 3600
logger.info(
f"Vocabulary induction loop started — "
@@ -252,7 +253,11 @@ async def vocabulary_induction_loop() -> None:
while True:
await asyncio.sleep(interval)
try:
await _run_induction_pass()
flags = await get_flags()
if flags["vocabulary_learning_enabled"]:
await _run_induction_pass()
else:
logger.info("Vocabulary learning disabled — skipping induction pass")
except Exception as e:
logger.error(f"Vocabulary induction pass failed: {e}")
+2 -1
View File
@@ -10,7 +10,7 @@ from app.internal.vocabulary_learner import vocabulary_induction_loop
from app.internal.recorrelation_sweep import recorrelation_loop
from app.config import settings
from app.internal.auth import require_firebase_token, require_service_or_firebase_token
from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts
from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts, admin
from app.internal import firestore as fstore
@@ -69,6 +69,7 @@ app.include_router(tokens.router, dependencies=[Depends(require_service_or_fi
app.include_router(incidents.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(alerts.router, dependencies=[Depends(require_service_or_firebase_token)])
app.include_router(upload.router) # auth is per-node, handled inline
app.include_router(admin.router) # auth is per-endpoint (read: firebase, write: admin)
@app.get("/health")
+17
View File
@@ -0,0 +1,17 @@
from fastapi import APIRouter, Depends
from app.internal.auth import require_admin_token, require_firebase_token
from app.internal.feature_flags import get_flags, set_flags
router = APIRouter(prefix="/admin", tags=["admin"])
@router.get("/features")
async def get_feature_flags(_=Depends(require_firebase_token)):
"""Return the current AI feature flag state. Any authenticated user can read."""
return await get_flags()
@router.put("/features")
async def update_feature_flags(body: dict, _=Depends(require_admin_token)):
"""Update one or more AI feature flags. Admin only."""
return await set_flags(body)
+53 -43
View File
@@ -157,64 +157,74 @@ async def _run_intelligence_pipeline(
4. Check alert rules and dispatch notifications
"""
from app.internal import transcription, intelligence, incident_correlator, alerter
from app.internal.feature_flags import get_flags
flags = await get_flags()
transcript: Optional[str] = None
segments: list[dict] = []
# Step 1: Transcription
if gcs_uri:
transcript, segments = await transcription.transcribe_call(
call_id, gcs_uri, talkgroup_name, system_id=system_id
)
if flags["stt_enabled"]:
transcript, segments = await transcription.transcribe_call(
call_id, gcs_uri, talkgroup_name, system_id=system_id
)
else:
logger.info(f"STT disabled — skipping transcription for call {call_id}")
# Step 2: Scene detection + intelligence extraction
scenes: list[dict] = []
if transcript:
scenes = await intelligence.extract_scenes(
call_id, transcript, talkgroup_name,
talkgroup_id=talkgroup_id, system_id=system_id, segments=segments,
node_id=node_id,
)
if flags["correlation_enabled"]:
if transcript:
scenes = await intelligence.extract_scenes(
call_id, transcript, talkgroup_name,
talkgroup_id=talkgroup_id, system_id=system_id, segments=segments,
node_id=node_id,
)
else:
logger.info(f"Correlation disabled — skipping scene extraction and correlation for call {call_id}")
# Step 3: Correlate each scene independently.
# A single recording can produce multiple incidents on a busy channel.
incident_ids: list[str] = []
all_tags: list[str] = []
for scene in scenes:
all_tags.extend(scene["tags"])
incident_id = await incident_correlator.correlate_call(
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=scene["tags"],
incident_type=scene["incident_type"],
location=scene["location"],
location_coords=scene["location_coords"],
)
if incident_id and incident_id not in incident_ids:
incident_ids.append(incident_id)
if scene["resolved"] and incident_id:
await fstore.doc_set("incidents", incident_id, {"status": "resolved"})
logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)")
if flags["correlation_enabled"]:
for scene in scenes:
all_tags.extend(scene["tags"])
incident_id = await incident_correlator.correlate_call(
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=scene["tags"],
incident_type=scene["incident_type"],
location=scene["location"],
location_coords=scene["location_coords"],
)
if incident_id and incident_id not in incident_ids:
incident_ids.append(incident_id)
if scene["resolved"] and incident_id:
await fstore.doc_set("incidents", incident_id, {"status": "resolved"})
logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)")
# Correlator also runs for calls with no scenes (unclassified) to attempt
# talkgroup-based linking even when no transcript could be produced.
if not scenes:
incident_id = await incident_correlator.correlate_call(
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=[],
incident_type=None,
location=None,
location_coords=None,
)
if incident_id:
incident_ids.append(incident_id)
# Correlator also runs for calls with no scenes (unclassified) to attempt
# talkgroup-based linking even when no transcript could be produced.
if not scenes:
incident_id = await incident_correlator.correlate_call(
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=[],
incident_type=None,
location=None,
location_coords=None,
)
if incident_id:
incident_ids.append(incident_id)
if incident_ids:
await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids})