From 338b946ba304376f583b2afa9f51f1c892669cc6 Mon Sep 17 00:00:00 2001 From: Logan Date: Tue, 21 Apr 2026 22:17:30 -0400 Subject: [PATCH] Start to learn vocab from talkgroups to improve accuracy of STT --- drb-c2-core/app/config.py | 4 + drb-c2-core/app/internal/intelligence.py | 14 +- drb-c2-core/app/internal/transcription.py | 23 +- .../app/internal/vocabulary_learner.py | 426 ++++++++++++++++++ drb-c2-core/app/main.py | 5 +- drb-c2-core/app/routers/calls.py | 7 + drb-c2-core/app/routers/systems.py | 73 +++ drb-c2-core/app/routers/upload.py | 4 +- drb-frontend/app/systems/page.tsx | 186 +++++++- drb-frontend/lib/c2api.ts | 16 + drb-frontend/lib/types.ts | 9 + 11 files changed, 759 insertions(+), 8 deletions(-) create mode 100644 drb-c2-core/app/internal/vocabulary_learner.py diff --git a/drb-c2-core/app/config.py b/drb-c2-core/app/config.py index a759d47..841dafd 100644 --- a/drb-c2-core/app/config.py +++ b/drb-c2-core/app/config.py @@ -28,6 +28,10 @@ class Settings(BaseSettings): location_proximity_km: float = 0.5 # radius for location-proximity matching incident_auto_resolve_minutes: int = 90 # auto-resolve after N minutes with no new calls + # Vocabulary learning + vocabulary_induction_interval_hours: int = 24 # how often the induction loop runs + vocabulary_induction_sample_tokens: int = 4000 # ~tokens of transcript text sampled per system + # Internal service key — allows server-side services (discord bot) to call C2 without Firebase service_key: Optional[str] = None diff --git a/drb-c2-core/app/internal/intelligence.py b/drb-c2-core/app/internal/intelligence.py index 89be22e..66bdeb9 100644 --- a/drb-c2-core/app/internal/intelligence.py +++ b/drb-c2-core/app/internal/intelligence.py @@ -36,7 +36,7 @@ Rules: System: {system_id} Talkgroup: {talkgroup_name} -{transcript_block}""" +{vocabulary_block}{transcript_block}""" # Nominatim viewbox half-width in degrees (~11 km at mid-latitudes) _GEO_DELTA = 0.1 @@ -76,8 +76,15 @@ async def extract_tags( Side-effect: updates calls/{call_id} in Firestore with tags, location, location_coords, vehicles, units, severity, transcript_corrected; also stores embedding. """ + # Load per-system vocabulary for prompt injection + vocabulary: list[str] = [] + if system_id: + from app.internal.vocabulary_learner import get_vocabulary + vocab_data = await get_vocabulary(system_id) + vocabulary = vocab_data.get("vocabulary") or [] + result = await asyncio.to_thread( - _sync_extract, transcript, talkgroup_name, talkgroup_id, system_id, segments + _sync_extract, transcript, talkgroup_name, talkgroup_id, system_id, segments, vocabulary ) tags: list[str] = result.get("tags") or [] @@ -233,6 +240,7 @@ def _sync_extract( talkgroup_id: Optional[int], system_id: Optional[str], segments: Optional[list[dict]], + vocabulary: Optional[list[str]] = None, ) -> dict: """Call GPT-4o mini and parse the JSON response.""" from app.config import settings @@ -242,11 +250,13 @@ def _sync_extract( logger.warning("OPENAI_API_KEY not set — intelligence extraction disabled.") return {} + from app.internal.vocabulary_learner import build_gpt_vocab_block tg = f"{talkgroup_name} (TGID {talkgroup_id})" if talkgroup_id else (talkgroup_name or "unknown") prompt = _PROMPT_TEMPLATE.format( transcript_block=_build_transcript_block(transcript, segments), talkgroup_name=tg, system_id=system_id or "unknown", + vocabulary_block=build_gpt_vocab_block(vocabulary or []), ) try: diff --git a/drb-c2-core/app/internal/transcription.py b/drb-c2-core/app/internal/transcription.py index 65854ae..e975345 100644 --- a/drb-c2-core/app/internal/transcription.py +++ b/drb-c2-core/app/internal/transcription.py @@ -28,6 +28,7 @@ async def transcribe_call( call_id: str, gcs_uri: str, talkgroup_name: Optional[str] = None, + system_id: Optional[str] = None, ) -> tuple[Optional[str], list[dict]]: """ Transcribe audio at the given GCS URI and store the result in Firestore. @@ -39,8 +40,17 @@ async def transcribe_call( if not gcs_uri or not gcs_uri.startswith("gs://"): return None, [] + # Load vocabulary for this system (empty list if none yet) + vocabulary: list[str] = [] + if system_id: + from app.internal.vocabulary_learner import get_vocabulary + vocab_data = await get_vocabulary(system_id) + vocabulary = vocab_data.get("vocabulary") or [] + try: - transcript, segments = await asyncio.to_thread(_sync_transcribe, gcs_uri, talkgroup_name) + transcript, segments = await asyncio.to_thread( + _sync_transcribe, gcs_uri, talkgroup_name, vocabulary + ) except Exception as e: logger.warning(f"Transcription failed for call {call_id}: {e}") return None, [] @@ -61,7 +71,11 @@ async def transcribe_call( return transcript, segments -def _sync_transcribe(gcs_uri: str, talkgroup_name: Optional[str] = None) -> tuple[Optional[str], list[dict]]: +def _sync_transcribe( + gcs_uri: str, + talkgroup_name: Optional[str] = None, + vocabulary: Optional[list[str]] = None, +) -> tuple[Optional[str], list[dict]]: """Download audio from GCS and transcribe with OpenAI Whisper.""" from google.cloud import storage as gcs from google.oauth2 import service_account @@ -94,7 +108,10 @@ def _sync_transcribe(gcs_uri: str, talkgroup_name: Optional[str] = None) -> tupl try: blob.download_to_filename(tmp_path) - prompt = (f"Talkgroup: {talkgroup_name}. " + _WHISPER_PROMPT) if talkgroup_name else _WHISPER_PROMPT + from app.internal.vocabulary_learner import build_whisper_vocab_prompt + vocab_prefix = build_whisper_vocab_prompt(vocabulary or []) + tg_prefix = f"Talkgroup: {talkgroup_name}. " if talkgroup_name else "" + prompt = tg_prefix + vocab_prefix + _WHISPER_PROMPT openai_client = OpenAI(api_key=settings.openai_api_key) with open(tmp_path, "rb") as f: diff --git a/drb-c2-core/app/internal/vocabulary_learner.py b/drb-c2-core/app/internal/vocabulary_learner.py new file mode 100644 index 0000000..2a71ba3 --- /dev/null +++ b/drb-c2-core/app/internal/vocabulary_learner.py @@ -0,0 +1,426 @@ +""" +Per-system vocabulary learning for STT accuracy improvement. + +Three mechanisms: +1. Bootstrap — one-shot GPT-4o call generates local knowledge at system setup: + agencies + abbreviations, unit naming, streets, acronyms. +2. Correction — diffs admin transcript edits, extracts corrected tokens → vocabulary. +3. Induction — background loop samples N tokens of transcripts per system, + asks GPT-4o-mini to propose new terms → queued as pending for review. + +Firestore schema additions on system documents: + vocabulary: list[str] — approved terms; injected into Whisper + GPT prompts + vocabulary_pending: list[dict] — induction suggestions awaiting admin review + each: {term, source, added_at} + vocabulary_bootstrapped: bool — bootstrap has been run at least once +""" +import asyncio +import difflib +import json +import random +from datetime import datetime, timezone +from typing import Optional +from app.internal.logger import logger +from app.internal import firestore as fstore +from app.config import settings + + +# ───────────────────────────────────────────────────────────────────────────── +# Prompt templates +# ───────────────────────────────────────────────────────────────────────────── + +_BOOTSTRAP_PROMPT = """\ +You are building a radio vocabulary dictionary to improve speech-to-text accuracy for a P25 \ +public-safety radio monitoring system in a specific area. The STT model has no local knowledge, \ +so common terms like "YVAC" get transcribed as "why vac", "5-baker" as "5 acre", etc. + +System name: {system_name} +System type: {system_type} +Area context: {area_hint} + +Return ONLY a JSON object: +{{"vocabulary": [list of strings]}} + +Include terms you are confident about for this area: +- Agency names and their radio abbreviations (e.g. "YVAC" = Yorktown Volunteer Ambulance Corps) +- Unit ID examples using the local naming convention (e.g. "5-baker", "5-charlie", "1-david"; + many departments use APCO phonetics: adam, baker, charles, david, edward, frank, george, + henry, ida, john, king, lincoln, mary, nora, ocean, paul, queen, robert, sam, tom, union, + victor, william, x-ray, young, zebra) +- Major routes, roads, and key intersections +- Local landmarks and geographic references dispatchers use +- Agency-specific codes that differ from standard APCO + +Return a flat list of strings — abbreviations, proper names, unit IDs, street names. +Do NOT include common English words. Max 80 terms. Only include what you are confident is \ +accurate for this specific area; return fewer terms rather than guessing.""" + +_INDUCTION_PROMPT = """\ +You are analyzing P25 emergency radio transcripts to find vocabulary terms that should be \ +added to improve future speech-to-text accuracy for this system. + +System: {system_name} +Existing approved vocabulary (do not re-propose these): {existing_vocab} + +Sampled transcripts: +{transcript_block} + +Find terms that are LIKELY STT errors or local terms missing from the vocabulary: +- Unit IDs that appear garbled (e.g. "5 acre" → "5-baker") +- Agency acronyms spelled out phonetically (e.g. "why vac" → "YVAC") +- Street names or locations that look misspelled or oddly transcribed +- Callsigns or local codes not yet in the vocabulary + +Return ONLY a JSON object: +{{"new_terms": ["term1", "term2", ...]}} + +Only include high-confidence additions not already in existing vocabulary. +Return {{"new_terms": []}} if nothing new is found.""" + + +# ───────────────────────────────────────────────────────────────────────────── +# Public API +# ───────────────────────────────────────────────────────────────────────────── + +async def bootstrap_system_vocabulary(system_id: str) -> list[str]: + """ + One-shot GPT-4o bootstrap: generate local-knowledge vocabulary for a system. + Merges generated terms into system.vocabulary and sets vocabulary_bootstrapped=True. + Returns the list of newly generated terms. + """ + system_doc = await fstore.doc_get("systems", system_id) + if not system_doc: + logger.warning(f"Vocabulary bootstrap: system {system_id} not found") + return [] + + system_name = system_doc.get("name", "Unknown") + system_type = system_doc.get("type", "P25") + + # Build area hint from configured talkgroup names + talkgroups = system_doc.get("config", {}).get("talkgroups", []) + tg_names = [tg.get("name", "") for tg in talkgroups if tg.get("name")][:8] + area_hint = f"Talkgroups include: {', '.join(tg_names)}" if tg_names else "Unknown area" + + terms = await asyncio.to_thread(_sync_bootstrap, system_name, system_type, area_hint) + if not terms: + return [] + + existing = system_doc.get("vocabulary") or [] + existing_lower = {t.lower() for t in existing} + to_add = [t for t in terms if t.lower() not in existing_lower] + merged = list(dict.fromkeys(existing + to_add)) + + await fstore.doc_set("systems", system_id, { + "vocabulary": merged, + "vocabulary_bootstrapped": True, + }) + logger.info( + f"Vocabulary bootstrap: {len(to_add)} term(s) generated for system {system_id} " + f"({system_name})" + ) + return to_add + + +async def learn_from_correction(system_id: str, original: str, corrected: str) -> None: + """ + Diff original and corrected transcripts; append new tokens to the approved vocabulary. + Called automatically when an admin saves a transcript correction. + """ + if not system_id or not original or not corrected: + return + + new_terms = _diff_new_terms(original, corrected) + if not new_terms: + return + + system_doc = await fstore.doc_get("systems", system_id) + if not system_doc: + return + + existing = system_doc.get("vocabulary") or [] + existing_lower = {t.lower() for t in existing} + to_add = [t for t in new_terms if t.lower() not in existing_lower] + if not to_add: + return + + merged = list(dict.fromkeys(existing + to_add)) + await fstore.doc_set("systems", system_id, {"vocabulary": merged}) + logger.info( + f"Vocabulary: learned {len(to_add)} term(s) from correction on system {system_id}: " + f"{to_add}" + ) + + +async def approve_pending_term(system_id: str, term: str) -> None: + """Move a pending term into the approved vocabulary.""" + system_doc = await fstore.doc_get("systems", system_id) + if not system_doc: + return + pending = [p for p in (system_doc.get("vocabulary_pending") or []) if p["term"] != term] + vocab = system_doc.get("vocabulary") or [] + if term.lower() not in {t.lower() for t in vocab}: + vocab = list(dict.fromkeys(vocab + [term])) + await fstore.doc_set("systems", system_id, { + "vocabulary": vocab, + "vocabulary_pending": pending, + }) + + +async def dismiss_pending_term(system_id: str, term: str) -> None: + """Remove a pending term without adding it to vocabulary.""" + system_doc = await fstore.doc_get("systems", system_id) + if not system_doc: + return + pending = [p for p in (system_doc.get("vocabulary_pending") or []) if p["term"] != term] + await fstore.doc_set("systems", system_id, {"vocabulary_pending": pending}) + + +async def add_term(system_id: str, term: str) -> None: + """Manually add a term to the approved vocabulary.""" + system_doc = await fstore.doc_get("systems", system_id) + if not system_doc: + return + vocab = system_doc.get("vocabulary") or [] + if term.lower() not in {t.lower() for t in vocab}: + vocab = list(dict.fromkeys(vocab + [term.strip()])) + await fstore.doc_set("systems", system_id, {"vocabulary": vocab}) + + +async def remove_term(system_id: str, term: str) -> None: + """Remove a term from the approved vocabulary.""" + system_doc = await fstore.doc_get("systems", system_id) + if not system_doc: + return + vocab = [t for t in (system_doc.get("vocabulary") or []) if t.lower() != term.lower()] + await fstore.doc_set("systems", system_id, {"vocabulary": vocab}) + + +async def get_vocabulary(system_id: str) -> dict: + """Return vocabulary and pending terms for a system.""" + doc = await fstore.doc_get("systems", system_id) + if not doc: + return {"vocabulary": [], "vocabulary_pending": [], "vocabulary_bootstrapped": False} + return { + "vocabulary": doc.get("vocabulary") or [], + "vocabulary_pending": doc.get("vocabulary_pending") or [], + "vocabulary_bootstrapped": doc.get("vocabulary_bootstrapped", False), + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Prompt-injection helpers (called by transcription.py and intelligence.py) +# ───────────────────────────────────────────────────────────────────────────── + +def build_whisper_vocab_prompt(vocabulary: list[str]) -> str: + """ + Format vocabulary for Whisper prompt injection. + Whisper's prompt field acts as a context prior with a ~224-token limit. + The base _WHISPER_PROMPT uses ~70 tokens; we budget ~150 tokens (≈550 chars) here. + """ + if not vocabulary: + return "" + char_budget = 550 + terms: list[str] = [] + used = 0 + for term in vocabulary: + cost = len(term) + 2 # ", " + if used + cost > char_budget: + break + terms.append(term) + used += cost + return ", ".join(terms) + ". " if terms else "" + + +def build_gpt_vocab_block(vocabulary: list[str]) -> str: + """Format vocabulary for injection into GPT extraction prompts.""" + if not vocabulary: + return "" + return f"Known local terms: {', '.join(vocabulary)}\n" + + +# ───────────────────────────────────────────────────────────────────────────── +# Background induction loop +# ───────────────────────────────────────────────────────────────────────────── + +async def vocabulary_induction_loop() -> None: + interval = settings.vocabulary_induction_interval_hours * 3600 + logger.info( + f"Vocabulary induction loop started — " + f"interval: {settings.vocabulary_induction_interval_hours}h, " + f"sample budget: {settings.vocabulary_induction_sample_tokens} tokens" + ) + while True: + await asyncio.sleep(interval) + try: + await _run_induction_pass() + except Exception as e: + logger.error(f"Vocabulary induction pass failed: {e}") + + +async def _run_induction_pass() -> None: + systems = await fstore.collection_list("systems") + if not systems: + return + logger.info(f"Vocabulary induction: processing {len(systems)} system(s)") + for system in systems: + system_id = system.get("system_id") + if system_id: + try: + await _induct_system(system_id, system) + except Exception as e: + logger.warning(f"Induction failed for system {system_id}: {e}") + + +async def _induct_system(system_id: str, system_doc: dict) -> None: + """Sample random transcripts for a system and propose new vocabulary.""" + system_name = system_doc.get("name", "Unknown") + existing_vocab: list[str] = system_doc.get("vocabulary") or [] + + # Fetch recent ended calls for this system + all_calls = await fstore.collection_list("calls", system_id=system_id, status="ended") + if not all_calls: + return + + # Random sample up to the token budget (4 chars ≈ 1 token) + random.shuffle(all_calls) + char_budget = settings.vocabulary_induction_sample_tokens * 4 + transcript_block = "" + sampled = 0 + for call in all_calls: + text = call.get("transcript_corrected") or call.get("transcript") or "" + if not text: + continue + if len(transcript_block) + len(text) > char_budget: + break + tg = call.get("talkgroup_name") or f"TGID {call.get('talkgroup_id', '?')}" + transcript_block += f"[{tg}] {text}\n" + sampled += 1 + + if sampled < 3: + return # not enough data to learn from yet + + new_terms = await asyncio.to_thread( + _sync_induct, system_name, existing_vocab, transcript_block + ) + if not new_terms: + return + + now = datetime.now(timezone.utc).isoformat() + existing_pending: list[dict] = system_doc.get("vocabulary_pending") or [] + pending_lower = {p["term"].lower() for p in existing_pending} + vocab_lower = {t.lower() for t in existing_vocab} + + to_queue = [ + {"term": t, "source": "induction", "added_at": now} + for t in new_terms + if t.lower() not in vocab_lower and t.lower() not in pending_lower + ] + if not to_queue: + return + + await fstore.doc_set("systems", system_id, { + "vocabulary_pending": existing_pending + to_queue, + }) + logger.info( + f"Vocabulary induction: {len(to_queue)} new term(s) proposed for " + f"system {system_id} ({system_name}): {[p['term'] for p in to_queue]}" + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Internal sync helpers +# ───────────────────────────────────────────────────────────────────────────── + +_STOP_WORDS = { + "the", "and", "for", "are", "was", "were", "this", "that", "with", + "have", "has", "had", "but", "not", "from", "they", "will", "what", + "can", "all", "been", "one", "two", "three", "four", "five", "six", + "you", "out", "who", "get", "her", "him", "his", "its", "our", "my", + "via", "per", "any", "now", "got", "she", "let", "did", "may", "yes", + "sir", "say", "see", "too", "off", "how", "put", "set", "try", "back", + "just", "like", "into", "than", "them", "then", "some", "also", "onto", + "went", "over", "copy", "okay", "unit", "post", "road", "lane", "going", + "being", "doing", "there", "their", "about", "would", "could", "should", + "route", "north", "south", "east", "west", "avenue", "street", "drive", +} + + +def _diff_new_terms(original: str, corrected: str) -> list[str]: + """ + Token-level diff: find tokens in `corrected` that replaced or were inserted + relative to `original`. These are the admin's intended spellings — good + candidates for vocabulary. + """ + orig_tokens = original.split() + corr_tokens = corrected.split() + + matcher = difflib.SequenceMatcher(None, + [t.lower() for t in orig_tokens], + [t.lower() for t in corr_tokens], + ) + new_terms: list[str] = [] + for tag, _i1, _i2, j1, j2 in matcher.get_opcodes(): + if tag in ("insert", "replace"): + for tok in corr_tokens[j1:j2]: + clean = tok.strip(".,!?;:()'\"").strip("-") + if len(clean) >= 3 and clean.lower() not in _STOP_WORDS: + new_terms.append(clean) + + return list(dict.fromkeys(new_terms)) + + +def _sync_bootstrap(system_name: str, system_type: str, area_hint: str) -> list[str]: + from app.config import settings as cfg + from openai import OpenAI + + if not cfg.openai_api_key: + return [] + + prompt = _BOOTSTRAP_PROMPT.format( + system_name=system_name, + system_type=system_type, + area_hint=area_hint, + ) + try: + client = OpenAI(api_key=cfg.openai_api_key) + response = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": prompt}], + response_format={"type": "json_object"}, + ) + data = json.loads(response.choices[0].message.content) + terms = data.get("vocabulary") or [] + return [str(t).strip() for t in terms if str(t).strip()] + except Exception as e: + logger.warning(f"Vocabulary bootstrap GPT call failed: {e}") + return [] + + +def _sync_induct( + system_name: str, existing_vocab: list[str], transcript_block: str +) -> list[str]: + from app.config import settings as cfg + from openai import OpenAI + + if not cfg.openai_api_key: + return [] + + vocab_str = ", ".join(existing_vocab[:80]) if existing_vocab else "(none yet)" + prompt = _INDUCTION_PROMPT.format( + system_name=system_name, + existing_vocab=vocab_str, + transcript_block=transcript_block[:8000], + ) + try: + client = OpenAI(api_key=cfg.openai_api_key) + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": prompt}], + response_format={"type": "json_object"}, + ) + data = json.loads(response.choices[0].message.content) + terms = data.get("new_terms") or [] + return [str(t).strip() for t in terms if str(t).strip()] + except Exception as e: + logger.warning(f"Vocabulary induction GPT call failed: {e}") + return [] diff --git a/drb-c2-core/app/main.py b/drb-c2-core/app/main.py index e0e122f..acd164a 100644 --- a/drb-c2-core/app/main.py +++ b/drb-c2-core/app/main.py @@ -6,6 +6,7 @@ from app.internal.logger import logger from app.internal.mqtt_handler import mqtt_handler from app.internal.node_sweeper import sweeper_loop from app.internal.summarizer import summarizer_loop +from app.internal.vocabulary_learner import vocabulary_induction_loop from app.config import settings from app.internal.auth import require_firebase_token, require_service_or_firebase_token from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts @@ -35,14 +36,16 @@ async def lifespan(app: FastAPI): await _release_orphaned_tokens() await mqtt_handler.connect() - sweeper_task = asyncio.create_task(sweeper_loop()) + sweeper_task = asyncio.create_task(sweeper_loop()) summarizer_task = asyncio.create_task(summarizer_loop()) + induction_task = asyncio.create_task(vocabulary_induction_loop()) yield # --- app running --- logger.info("DRB C2 Core shutting down.") sweeper_task.cancel() summarizer_task.cancel() + induction_task.cancel() await mqtt_handler.disconnect() diff --git a/drb-c2-core/app/routers/calls.py b/drb-c2-core/app/routers/calls.py index d2ff664..4c90e09 100644 --- a/drb-c2-core/app/routers/calls.py +++ b/drb-c2-core/app/routers/calls.py @@ -83,6 +83,13 @@ async def patch_transcript( "embedding": None, }) + # Learn from the correction: diff original → corrected and add new tokens to vocabulary + system_id = call.get("system_id") + original_text = call.get("transcript_corrected") or call.get("transcript") or "" + if system_id and original_text: + from app.internal.vocabulary_learner import learn_from_correction + await learn_from_correction(system_id, original_text, body.transcript) + from app.routers.upload import _run_extraction_pipeline background_tasks.add_task( _run_extraction_pipeline, diff --git a/drb-c2-core/app/routers/systems.py b/drb-c2-core/app/routers/systems.py index fd37f88..d99182b 100644 --- a/drb-c2-core/app/routers/systems.py +++ b/drb-c2-core/app/routers/systems.py @@ -1,11 +1,17 @@ import uuid from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from typing import Optional from app.models import SystemCreate, SystemRecord from app.internal import firestore as fstore router = APIRouter(prefix="/systems", tags=["systems"]) +class VocabularyTermBody(BaseModel): + term: str + + @router.get("") async def list_systems(): return await fstore.collection_list("systems") @@ -42,3 +48,70 @@ async def delete_system(system_id: str): if not existing: raise HTTPException(404, f"System '{system_id}' not found.") await fstore.doc_delete("systems", system_id) + + +# ── Vocabulary endpoints ─────────────────────────────────────────────────────── + +@router.get("/{system_id}/vocabulary") +async def get_vocabulary(system_id: str): + """Return approved vocabulary and pending induction suggestions.""" + existing = await fstore.doc_get("systems", system_id) + if not existing: + raise HTTPException(404, f"System '{system_id}' not found.") + from app.internal.vocabulary_learner import get_vocabulary as _get + return await _get(system_id) + + +@router.post("/{system_id}/vocabulary/bootstrap", status_code=202) +async def bootstrap_vocabulary(system_id: str): + """Trigger a one-shot GPT-4o bootstrap to seed the vocabulary from local knowledge.""" + existing = await fstore.doc_get("systems", system_id) + if not existing: + raise HTTPException(404, f"System '{system_id}' not found.") + from app.internal.vocabulary_learner import bootstrap_system_vocabulary + terms = await bootstrap_system_vocabulary(system_id) + return {"added": len(terms), "terms": terms} + + +@router.post("/{system_id}/vocabulary/terms") +async def add_vocabulary_term(system_id: str, body: VocabularyTermBody): + """Manually add a term to the approved vocabulary.""" + existing = await fstore.doc_get("systems", system_id) + if not existing: + raise HTTPException(404, f"System '{system_id}' not found.") + from app.internal.vocabulary_learner import add_term + await add_term(system_id, body.term.strip()) + return {"ok": True} + + +@router.delete("/{system_id}/vocabulary/terms") +async def remove_vocabulary_term(system_id: str, body: VocabularyTermBody): + """Remove a term from the approved vocabulary.""" + existing = await fstore.doc_get("systems", system_id) + if not existing: + raise HTTPException(404, f"System '{system_id}' not found.") + from app.internal.vocabulary_learner import remove_term + await remove_term(system_id, body.term) + return {"ok": True} + + +@router.post("/{system_id}/vocabulary/pending/approve") +async def approve_pending(system_id: str, body: VocabularyTermBody): + """Move a pending induction suggestion into the approved vocabulary.""" + existing = await fstore.doc_get("systems", system_id) + if not existing: + raise HTTPException(404, f"System '{system_id}' not found.") + from app.internal.vocabulary_learner import approve_pending_term + await approve_pending_term(system_id, body.term) + return {"ok": True} + + +@router.post("/{system_id}/vocabulary/pending/dismiss") +async def dismiss_pending(system_id: str, body: VocabularyTermBody): + """Dismiss a pending induction suggestion without adding it.""" + existing = await fstore.doc_get("systems", system_id) + if not existing: + raise HTTPException(404, f"System '{system_id}' not found.") + from app.internal.vocabulary_learner import dismiss_pending_term + await dismiss_pending_term(system_id, body.term) + return {"ok": True} diff --git a/drb-c2-core/app/routers/upload.py b/drb-c2-core/app/routers/upload.py index 6463d9e..ee10f27 100644 --- a/drb-c2-core/app/routers/upload.py +++ b/drb-c2-core/app/routers/upload.py @@ -151,7 +151,9 @@ async def _run_intelligence_pipeline( # Step 1: Transcription if gcs_uri: - transcript, segments = await transcription.transcribe_call(call_id, gcs_uri, talkgroup_name) + transcript, segments = await transcription.transcribe_call( + call_id, gcs_uri, talkgroup_name, system_id=system_id + ) # Step 2: Intelligence extraction tags: list[str] = [] diff --git a/drb-frontend/app/systems/page.tsx b/drb-frontend/app/systems/page.tsx index e6babd5..5a985a4 100644 --- a/drb-frontend/app/systems/page.tsx +++ b/drb-frontend/app/systems/page.tsx @@ -3,7 +3,7 @@ import { useState } from "react"; import { useSystems } from "@/lib/useSystems"; import { c2api } from "@/lib/c2api"; -import type { SystemRecord } from "@/lib/types"; +import type { SystemRecord, VocabularyPendingTerm } from "@/lib/types"; // ── P25 structured config types ────────────────────────────────────────────── @@ -433,6 +433,189 @@ function SystemForm({ ); } +// ── Vocabulary panel ────────────────────────────────────────────────────────── + +function VocabularyPanel({ systemId }: { systemId: string }) { + const [vocab, setVocab] = useState(null); + const [pending, setPending] = useState([]); + const [bootstrapped, setBootstrapped] = useState(false); + const [loading, setLoading] = useState(false); + const [bootstrapping, setBootstrapping] = useState(false); + const [newTerm, setNewTerm] = useState(""); + const [adding, setAdding] = useState(false); + const [open, setOpen] = useState(false); + + async function load() { + if (vocab !== null) return; // already loaded + setLoading(true); + try { + const data = await c2api.getVocabulary(systemId); + setVocab(data.vocabulary); + setPending(data.vocabulary_pending); + setBootstrapped(data.vocabulary_bootstrapped); + } finally { + setLoading(false); + } + } + + function toggle() { + if (!open) load(); + setOpen((v) => !v); + } + + async function handleBootstrap() { + setBootstrapping(true); + try { + const result = await c2api.bootstrapVocabulary(systemId); + const data = await c2api.getVocabulary(systemId); + setVocab(data.vocabulary); + setPending(data.vocabulary_pending); + setBootstrapped(true); + alert(`Bootstrap added ${result.added} term(s).`); + } finally { + setBootstrapping(false); + } + } + + async function handleAdd(e: React.FormEvent) { + e.preventDefault(); + const term = newTerm.trim(); + if (!term) return; + setAdding(true); + try { + await c2api.addVocabularyTerm(systemId, term); + setVocab((v) => (v ? [...v, term] : [term])); + setNewTerm(""); + } finally { + setAdding(false); + } + } + + async function handleRemove(term: string) { + await c2api.removeVocabularyTerm(systemId, term); + setVocab((v) => (v ?? []).filter((t) => t !== term)); + } + + async function handleApprove(term: string) { + await c2api.approvePendingTerm(systemId, term); + setVocab((v) => (v ? [...v, term] : [term])); + setPending((p) => p.filter((t) => t.term !== term)); + } + + async function handleDismiss(term: string) { + await c2api.dismissPendingTerm(systemId, term); + setPending((p) => p.filter((t) => t.term !== term)); + } + + return ( +
+ + + {open && ( +
+ {loading &&

Loading…

} + + {!loading && vocab !== null && ( + <> + {/* Bootstrap button */} +
+ + GPT-4o generates local knowledge (agencies, units, streets) +
+ + {/* Approved vocabulary chips */} +
+

Approved ({vocab.length})

+ {vocab.length > 0 ? ( +
+ {vocab.map((term) => ( + + {term} + + + ))} +
+ ) : ( +

No terms yet — bootstrap or add manually.

+ )} +
+ + {/* Add term */} +
+ setNewTerm(e.target.value)} + placeholder="Add term (e.g. 5-baker, YVAC)" + className="flex-1 bg-gray-800 border border-gray-700 rounded-lg px-3 py-1.5 text-white text-xs font-mono focus:outline-none focus:border-indigo-500" + /> + +
+ + {/* Pending induction suggestions */} + {pending.length > 0 && ( +
+

+ Induction suggestions ({pending.length}) +

+
+ {pending.map((p) => ( +
+ {p.term} + {p.source} + + +
+ ))} +
+
+ )} + + )} +
+ )} +
+ ); +} + // ── Systems list page ───────────────────────────────────────────────────────── export default function SystemsPage() { @@ -509,6 +692,7 @@ export default function SystemsPage() { Delete + ); })} diff --git a/drb-frontend/lib/c2api.ts b/drb-frontend/lib/c2api.ts index e308c0e..cdbb884 100644 --- a/drb-frontend/lib/c2api.ts +++ b/drb-frontend/lib/c2api.ts @@ -93,4 +93,20 @@ export const c2api = { // Node key management reissueNodeKey: (nodeId: string) => request(`/nodes/${nodeId}/reissue-key`, { method: "POST" }), + + // Vocabulary + getVocabulary: (systemId: string) => + request<{ vocabulary: string[]; vocabulary_pending: { term: string; source: string; added_at: string }[]; vocabulary_bootstrapped: boolean }>( + `/systems/${systemId}/vocabulary` + ), + bootstrapVocabulary: (systemId: string) => + request<{ added: number; terms: string[] }>(`/systems/${systemId}/vocabulary/bootstrap`, { method: "POST" }), + addVocabularyTerm: (systemId: string, term: string) => + request(`/systems/${systemId}/vocabulary/terms`, { method: "POST", body: JSON.stringify({ term }) }), + removeVocabularyTerm: (systemId: string, term: string) => + request(`/systems/${systemId}/vocabulary/terms`, { method: "DELETE", body: JSON.stringify({ term }) }), + approvePendingTerm: (systemId: string, term: string) => + request(`/systems/${systemId}/vocabulary/pending/approve`, { method: "POST", body: JSON.stringify({ term }) }), + dismissPendingTerm: (systemId: string, term: string) => + request(`/systems/${systemId}/vocabulary/pending/dismiss`, { method: "POST", body: JSON.stringify({ term }) }), }; diff --git a/drb-frontend/lib/types.ts b/drb-frontend/lib/types.ts index f4ce559..388f511 100644 --- a/drb-frontend/lib/types.ts +++ b/drb-frontend/lib/types.ts @@ -13,11 +13,20 @@ export interface NodeRecord { approval_status: ApprovalStatus | null; } +export interface VocabularyPendingTerm { + term: string; + source: "induction" | "correction"; + added_at: string; +} + export interface SystemRecord { system_id: string; name: string; type: string; // P25 | DMR | NBFM config: Record; + vocabulary?: string[]; + vocabulary_pending?: VocabularyPendingTerm[]; + vocabulary_bootstrapped?: boolean; } export interface TranscriptSegment {