Compare commits

..

2 Commits

Author SHA1 Message Date
Logan 65839a3191 Implement recorrelation logic 2026-04-21 22:19:57 -04:00
Logan 338b946ba3 Start to learn vocab from talkgroups to improve accuracy of STT 2026-04-21 22:17:30 -04:00
14 changed files with 915 additions and 19 deletions
+5
View File
@@ -27,6 +27,11 @@ class Settings(BaseSettings):
embedding_similarity_threshold: float = 0.93 # slow-path cosine threshold (tiebreaker only)
location_proximity_km: float = 0.5 # radius for location-proximity matching
incident_auto_resolve_minutes: int = 90 # auto-resolve after N minutes with no new calls
recorrelation_scan_minutes: int = 15 # re-examine orphaned calls ended within this window
# Vocabulary learning
vocabulary_induction_interval_hours: int = 24 # how often the induction loop runs
vocabulary_induction_sample_tokens: int = 4000 # ~tokens of transcript text sampled per system
# Internal service key — allows server-side services (discord bot) to call C2 without Firebase
service_key: Optional[str] = None
+18
View File
@@ -57,6 +57,24 @@ async def collection_list(collection: str, **filters) -> list[dict]:
return await asyncio.to_thread(_query)
async def collection_where(
collection: str,
conditions: list[tuple[str, str, Any]],
) -> list[dict]:
"""
Query a collection with arbitrary where-clauses.
conditions: list of (field, op, value) — e.g. [("ended_at", ">=", cutoff_dt)]
Supports any Firestore operator: "==", "!=", "<", "<=", ">", ">=".
"""
def _query():
ref = db.collection(collection)
for field, op, value in conditions:
ref = ref.where(field, op, value)
return [doc.to_dict() for doc in ref.stream()]
return await asyncio.to_thread(_query)
async def doc_delete(collection: str, doc_id: str) -> None:
ref = db.collection(collection).document(doc_id)
await asyncio.to_thread(ref.delete)
+25 -10
View File
@@ -46,17 +46,27 @@ async def correlate_call(
incident_type: Optional[str],
location: Optional[str] = None,
location_coords: Optional[dict] = None,
reference_time: Optional[datetime] = None,
create_if_new: bool = True,
) -> Optional[str]:
"""
Link call_id to an existing incident or create a new one.
Returns the incident_id, or None if skipped (no type and no talkgroup match).
reference_time — time anchor for the time-limited paths (location + slow).
Defaults to now. Pass call.started_at when re-correlating
orphaned calls so the window is anchored to when the call
actually happened, not when the sweep runs.
create_if_new — when False, skip new-incident creation (re-correlation only
links to existing incidents; it never creates new ones).
Returns the incident_id, or None if skipped.
"""
now = datetime.now(timezone.utc)
now = reference_time or datetime.now(timezone.utc)
window = timedelta(hours=settings.correlation_window_hours)
# Fetch all active incidents cross-type (mutual aid needs this)
all_active = await fstore.collection_list("incidents", status="active")
recent = [inc for inc in all_active if _within_window(inc, now, window)]
recent = [inc for inc in all_active if _within_window_of(inc, now, window)]
# Fetch call doc once — reused for disambiguation, embedding merge, unit accumulation
call_doc = await fstore.doc_get("calls", call_id) or {}
@@ -147,14 +157,14 @@ async def correlate_call(
location, location_coords, call_units, call_vehicles, call_embedding, now,
talkgroup_name=talkgroup_name, incident_type=incident_type,
)
elif incident_type:
elif incident_type and create_if_new:
incident_id = await _create_incident(
call_id, incident_type, talkgroup_id, talkgroup_name, system_id,
tags, location, location_coords,
call_units, call_vehicles, call_embedding, call_severity, now,
)
else:
# Unclassified call, no talkgroup match found — nothing to do
# No match and either no type or creation suppressed — nothing to do
return None
await fstore.doc_set("calls", call_id, {"incident_id": incident_id})
@@ -165,14 +175,19 @@ async def correlate_call(
# Internal helpers
# ─────────────────────────────────────────────────────────────────────────────
def _within_window(inc: dict, now: datetime, window: timedelta) -> bool:
def _within_window_of(inc: dict, anchor: datetime, window: timedelta) -> bool:
"""
True if the incident's started_at is within `window` of `anchor` in either
direction. Using started_at (not updated_at) means re-correlation anchored
to a call's started_at correctly matches incidents created shortly *after*
that call (e.g. a welfare check at T+0 vs. an incident created at T+15m).
"""
try:
dt = datetime.fromisoformat(
str(inc.get("updated_at", "")).replace("Z", "+00:00")
)
raw = inc.get("started_at") or inc.get("updated_at") or ""
dt = datetime.fromisoformat(str(raw).replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return (now - dt) <= window
return abs((anchor - dt).total_seconds()) <= window.total_seconds()
except Exception:
return False
+12 -2
View File
@@ -36,7 +36,7 @@ Rules:
System: {system_id}
Talkgroup: {talkgroup_name}
{transcript_block}"""
{vocabulary_block}{transcript_block}"""
# Nominatim viewbox half-width in degrees (~11 km at mid-latitudes)
_GEO_DELTA = 0.1
@@ -76,8 +76,15 @@ async def extract_tags(
Side-effect: updates calls/{call_id} in Firestore with tags, location,
location_coords, vehicles, units, severity, transcript_corrected; also stores embedding.
"""
# Load per-system vocabulary for prompt injection
vocabulary: list[str] = []
if system_id:
from app.internal.vocabulary_learner import get_vocabulary
vocab_data = await get_vocabulary(system_id)
vocabulary = vocab_data.get("vocabulary") or []
result = await asyncio.to_thread(
_sync_extract, transcript, talkgroup_name, talkgroup_id, system_id, segments
_sync_extract, transcript, talkgroup_name, talkgroup_id, system_id, segments, vocabulary
)
tags: list[str] = result.get("tags") or []
@@ -233,6 +240,7 @@ def _sync_extract(
talkgroup_id: Optional[int],
system_id: Optional[str],
segments: Optional[list[dict]],
vocabulary: Optional[list[str]] = None,
) -> dict:
"""Call GPT-4o mini and parse the JSON response."""
from app.config import settings
@@ -242,11 +250,13 @@ def _sync_extract(
logger.warning("OPENAI_API_KEY not set — intelligence extraction disabled.")
return {}
from app.internal.vocabulary_learner import build_gpt_vocab_block
tg = f"{talkgroup_name} (TGID {talkgroup_id})" if talkgroup_id else (talkgroup_name or "unknown")
prompt = _PROMPT_TEMPLATE.format(
transcript_block=_build_transcript_block(transcript, segments),
talkgroup_name=tg,
system_id=system_id or "unknown",
vocabulary_block=build_gpt_vocab_block(vocabulary or []),
)
try:
@@ -0,0 +1,108 @@
"""
Re-correlation sweep.
Runs every summary_interval_minutes (same tick as the summarizer). Each pass
finds calls that are:
- recently ended (ended_at within the last recorrelation_scan_minutes)
- still orphaned (incident_id is null)
and re-runs the incident correlator against currently-active incidents, using
the call's own started_at as the time anchor so the window is correct regardless
of when the sweep fires.
Never creates new incidents — link-only. Zero LLM tokens (uses pre-computed
talkgroup strings, haversine math, and stored embeddings).
"""
import asyncio
from datetime import datetime, timezone, timedelta
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
from app.config import settings
async def recorrelation_loop() -> None:
interval = settings.summary_interval_minutes * 60
logger.info(
f"Re-correlation sweep started — "
f"interval: {settings.summary_interval_minutes}m, "
f"scan window: {settings.recorrelation_scan_minutes}m"
)
while True:
await asyncio.sleep(interval)
try:
await _run_sweep_pass()
except Exception as e:
logger.error(f"Re-correlation sweep failed: {e}")
async def _run_sweep_pass() -> None:
cutoff = datetime.now(timezone.utc) - timedelta(minutes=settings.recorrelation_scan_minutes)
# Server-side range query: only calls that ended within the scan window.
# Filter incident_id=null client-side (Firestore can't query for missing fields).
# This keeps the fetched set small regardless of total collection size.
recent_ended = await fstore.collection_where("calls", [
("status", "==", "ended"),
("ended_at", ">=", cutoff),
])
orphans = [c for c in recent_ended if not c.get("incident_id")]
if not orphans:
return
logger.info(f"Re-correlation sweep: {len(orphans)} orphaned call(s) to check")
linked = 0
for call in orphans:
if await _recorrelate_orphan(call):
linked += 1
if linked:
logger.info(f"Re-correlation sweep: linked {linked}/{len(orphans)} orphaned call(s)")
async def _recorrelate_orphan(call: dict) -> bool:
"""
Attempt to link a single orphaned call to an existing incident.
Returns True if a match was found and the call was linked.
"""
from app.internal import incident_correlator
call_id = call.get("call_id")
started_at = _parse_dt(call.get("started_at"))
if not call_id or not started_at:
return False
# All data needed for correlation was stored by the first-pass extraction.
incident_id = await incident_correlator.correlate_call(
call_id = call_id,
node_id = call.get("node_id", ""),
system_id = call.get("system_id"),
talkgroup_id = call.get("talkgroup_id"),
talkgroup_name = call.get("talkgroup_name"),
tags = call.get("tags") or [],
incident_type = call.get("incident_type"),
location = call.get("location"),
location_coords= call.get("location_coords"),
reference_time = started_at, # anchor window to when the call happened
create_if_new = False, # never create — link-only
)
if incident_id:
logger.info(
f"Re-correlation: linked orphaned call {call_id} → incident {incident_id}"
)
return True
return False
def _parse_dt(value) -> Optional[datetime]:
if not value:
return None
try:
dt = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
return None
+20 -3
View File
@@ -28,6 +28,7 @@ async def transcribe_call(
call_id: str,
gcs_uri: str,
talkgroup_name: Optional[str] = None,
system_id: Optional[str] = None,
) -> tuple[Optional[str], list[dict]]:
"""
Transcribe audio at the given GCS URI and store the result in Firestore.
@@ -39,8 +40,17 @@ async def transcribe_call(
if not gcs_uri or not gcs_uri.startswith("gs://"):
return None, []
# Load vocabulary for this system (empty list if none yet)
vocabulary: list[str] = []
if system_id:
from app.internal.vocabulary_learner import get_vocabulary
vocab_data = await get_vocabulary(system_id)
vocabulary = vocab_data.get("vocabulary") or []
try:
transcript, segments = await asyncio.to_thread(_sync_transcribe, gcs_uri, talkgroup_name)
transcript, segments = await asyncio.to_thread(
_sync_transcribe, gcs_uri, talkgroup_name, vocabulary
)
except Exception as e:
logger.warning(f"Transcription failed for call {call_id}: {e}")
return None, []
@@ -61,7 +71,11 @@ async def transcribe_call(
return transcript, segments
def _sync_transcribe(gcs_uri: str, talkgroup_name: Optional[str] = None) -> tuple[Optional[str], list[dict]]:
def _sync_transcribe(
gcs_uri: str,
talkgroup_name: Optional[str] = None,
vocabulary: Optional[list[str]] = None,
) -> tuple[Optional[str], list[dict]]:
"""Download audio from GCS and transcribe with OpenAI Whisper."""
from google.cloud import storage as gcs
from google.oauth2 import service_account
@@ -94,7 +108,10 @@ def _sync_transcribe(gcs_uri: str, talkgroup_name: Optional[str] = None) -> tupl
try:
blob.download_to_filename(tmp_path)
prompt = (f"Talkgroup: {talkgroup_name}. " + _WHISPER_PROMPT) if talkgroup_name else _WHISPER_PROMPT
from app.internal.vocabulary_learner import build_whisper_vocab_prompt
vocab_prefix = build_whisper_vocab_prompt(vocabulary or [])
tg_prefix = f"Talkgroup: {talkgroup_name}. " if talkgroup_name else ""
prompt = tg_prefix + vocab_prefix + _WHISPER_PROMPT
openai_client = OpenAI(api_key=settings.openai_api_key)
with open(tmp_path, "rb") as f:
@@ -0,0 +1,426 @@
"""
Per-system vocabulary learning for STT accuracy improvement.
Three mechanisms:
1. Bootstrap — one-shot GPT-4o call generates local knowledge at system setup:
agencies + abbreviations, unit naming, streets, acronyms.
2. Correction — diffs admin transcript edits, extracts corrected tokens → vocabulary.
3. Induction — background loop samples N tokens of transcripts per system,
asks GPT-4o-mini to propose new terms → queued as pending for review.
Firestore schema additions on system documents:
vocabulary: list[str] — approved terms; injected into Whisper + GPT prompts
vocabulary_pending: list[dict] — induction suggestions awaiting admin review
each: {term, source, added_at}
vocabulary_bootstrapped: bool — bootstrap has been run at least once
"""
import asyncio
import difflib
import json
import random
from datetime import datetime, timezone
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
from app.config import settings
# ─────────────────────────────────────────────────────────────────────────────
# Prompt templates
# ─────────────────────────────────────────────────────────────────────────────
_BOOTSTRAP_PROMPT = """\
You are building a radio vocabulary dictionary to improve speech-to-text accuracy for a P25 \
public-safety radio monitoring system in a specific area. The STT model has no local knowledge, \
so common terms like "YVAC" get transcribed as "why vac", "5-baker" as "5 acre", etc.
System name: {system_name}
System type: {system_type}
Area context: {area_hint}
Return ONLY a JSON object:
{{"vocabulary": [list of strings]}}
Include terms you are confident about for this area:
- Agency names and their radio abbreviations (e.g. "YVAC" = Yorktown Volunteer Ambulance Corps)
- Unit ID examples using the local naming convention (e.g. "5-baker", "5-charlie", "1-david";
many departments use APCO phonetics: adam, baker, charles, david, edward, frank, george,
henry, ida, john, king, lincoln, mary, nora, ocean, paul, queen, robert, sam, tom, union,
victor, william, x-ray, young, zebra)
- Major routes, roads, and key intersections
- Local landmarks and geographic references dispatchers use
- Agency-specific codes that differ from standard APCO
Return a flat list of strings — abbreviations, proper names, unit IDs, street names.
Do NOT include common English words. Max 80 terms. Only include what you are confident is \
accurate for this specific area; return fewer terms rather than guessing."""
_INDUCTION_PROMPT = """\
You are analyzing P25 emergency radio transcripts to find vocabulary terms that should be \
added to improve future speech-to-text accuracy for this system.
System: {system_name}
Existing approved vocabulary (do not re-propose these): {existing_vocab}
Sampled transcripts:
{transcript_block}
Find terms that are LIKELY STT errors or local terms missing from the vocabulary:
- Unit IDs that appear garbled (e.g. "5 acre""5-baker")
- Agency acronyms spelled out phonetically (e.g. "why vac""YVAC")
- Street names or locations that look misspelled or oddly transcribed
- Callsigns or local codes not yet in the vocabulary
Return ONLY a JSON object:
{{"new_terms": ["term1", "term2", ...]}}
Only include high-confidence additions not already in existing vocabulary.
Return {{"new_terms": []}} if nothing new is found."""
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
async def bootstrap_system_vocabulary(system_id: str) -> list[str]:
"""
One-shot GPT-4o bootstrap: generate local-knowledge vocabulary for a system.
Merges generated terms into system.vocabulary and sets vocabulary_bootstrapped=True.
Returns the list of newly generated terms.
"""
system_doc = await fstore.doc_get("systems", system_id)
if not system_doc:
logger.warning(f"Vocabulary bootstrap: system {system_id} not found")
return []
system_name = system_doc.get("name", "Unknown")
system_type = system_doc.get("type", "P25")
# Build area hint from configured talkgroup names
talkgroups = system_doc.get("config", {}).get("talkgroups", [])
tg_names = [tg.get("name", "") for tg in talkgroups if tg.get("name")][:8]
area_hint = f"Talkgroups include: {', '.join(tg_names)}" if tg_names else "Unknown area"
terms = await asyncio.to_thread(_sync_bootstrap, system_name, system_type, area_hint)
if not terms:
return []
existing = system_doc.get("vocabulary") or []
existing_lower = {t.lower() for t in existing}
to_add = [t for t in terms if t.lower() not in existing_lower]
merged = list(dict.fromkeys(existing + to_add))
await fstore.doc_set("systems", system_id, {
"vocabulary": merged,
"vocabulary_bootstrapped": True,
})
logger.info(
f"Vocabulary bootstrap: {len(to_add)} term(s) generated for system {system_id} "
f"({system_name})"
)
return to_add
async def learn_from_correction(system_id: str, original: str, corrected: str) -> None:
"""
Diff original and corrected transcripts; append new tokens to the approved vocabulary.
Called automatically when an admin saves a transcript correction.
"""
if not system_id or not original or not corrected:
return
new_terms = _diff_new_terms(original, corrected)
if not new_terms:
return
system_doc = await fstore.doc_get("systems", system_id)
if not system_doc:
return
existing = system_doc.get("vocabulary") or []
existing_lower = {t.lower() for t in existing}
to_add = [t for t in new_terms if t.lower() not in existing_lower]
if not to_add:
return
merged = list(dict.fromkeys(existing + to_add))
await fstore.doc_set("systems", system_id, {"vocabulary": merged})
logger.info(
f"Vocabulary: learned {len(to_add)} term(s) from correction on system {system_id}: "
f"{to_add}"
)
async def approve_pending_term(system_id: str, term: str) -> None:
"""Move a pending term into the approved vocabulary."""
system_doc = await fstore.doc_get("systems", system_id)
if not system_doc:
return
pending = [p for p in (system_doc.get("vocabulary_pending") or []) if p["term"] != term]
vocab = system_doc.get("vocabulary") or []
if term.lower() not in {t.lower() for t in vocab}:
vocab = list(dict.fromkeys(vocab + [term]))
await fstore.doc_set("systems", system_id, {
"vocabulary": vocab,
"vocabulary_pending": pending,
})
async def dismiss_pending_term(system_id: str, term: str) -> None:
"""Remove a pending term without adding it to vocabulary."""
system_doc = await fstore.doc_get("systems", system_id)
if not system_doc:
return
pending = [p for p in (system_doc.get("vocabulary_pending") or []) if p["term"] != term]
await fstore.doc_set("systems", system_id, {"vocabulary_pending": pending})
async def add_term(system_id: str, term: str) -> None:
"""Manually add a term to the approved vocabulary."""
system_doc = await fstore.doc_get("systems", system_id)
if not system_doc:
return
vocab = system_doc.get("vocabulary") or []
if term.lower() not in {t.lower() for t in vocab}:
vocab = list(dict.fromkeys(vocab + [term.strip()]))
await fstore.doc_set("systems", system_id, {"vocabulary": vocab})
async def remove_term(system_id: str, term: str) -> None:
"""Remove a term from the approved vocabulary."""
system_doc = await fstore.doc_get("systems", system_id)
if not system_doc:
return
vocab = [t for t in (system_doc.get("vocabulary") or []) if t.lower() != term.lower()]
await fstore.doc_set("systems", system_id, {"vocabulary": vocab})
async def get_vocabulary(system_id: str) -> dict:
"""Return vocabulary and pending terms for a system."""
doc = await fstore.doc_get("systems", system_id)
if not doc:
return {"vocabulary": [], "vocabulary_pending": [], "vocabulary_bootstrapped": False}
return {
"vocabulary": doc.get("vocabulary") or [],
"vocabulary_pending": doc.get("vocabulary_pending") or [],
"vocabulary_bootstrapped": doc.get("vocabulary_bootstrapped", False),
}
# ─────────────────────────────────────────────────────────────────────────────
# Prompt-injection helpers (called by transcription.py and intelligence.py)
# ─────────────────────────────────────────────────────────────────────────────
def build_whisper_vocab_prompt(vocabulary: list[str]) -> str:
"""
Format vocabulary for Whisper prompt injection.
Whisper's prompt field acts as a context prior with a ~224-token limit.
The base _WHISPER_PROMPT uses ~70 tokens; we budget ~150 tokens (≈550 chars) here.
"""
if not vocabulary:
return ""
char_budget = 550
terms: list[str] = []
used = 0
for term in vocabulary:
cost = len(term) + 2 # ", "
if used + cost > char_budget:
break
terms.append(term)
used += cost
return ", ".join(terms) + ". " if terms else ""
def build_gpt_vocab_block(vocabulary: list[str]) -> str:
"""Format vocabulary for injection into GPT extraction prompts."""
if not vocabulary:
return ""
return f"Known local terms: {', '.join(vocabulary)}\n"
# ─────────────────────────────────────────────────────────────────────────────
# Background induction loop
# ─────────────────────────────────────────────────────────────────────────────
async def vocabulary_induction_loop() -> None:
interval = settings.vocabulary_induction_interval_hours * 3600
logger.info(
f"Vocabulary induction loop started — "
f"interval: {settings.vocabulary_induction_interval_hours}h, "
f"sample budget: {settings.vocabulary_induction_sample_tokens} tokens"
)
while True:
await asyncio.sleep(interval)
try:
await _run_induction_pass()
except Exception as e:
logger.error(f"Vocabulary induction pass failed: {e}")
async def _run_induction_pass() -> None:
systems = await fstore.collection_list("systems")
if not systems:
return
logger.info(f"Vocabulary induction: processing {len(systems)} system(s)")
for system in systems:
system_id = system.get("system_id")
if system_id:
try:
await _induct_system(system_id, system)
except Exception as e:
logger.warning(f"Induction failed for system {system_id}: {e}")
async def _induct_system(system_id: str, system_doc: dict) -> None:
"""Sample random transcripts for a system and propose new vocabulary."""
system_name = system_doc.get("name", "Unknown")
existing_vocab: list[str] = system_doc.get("vocabulary") or []
# Fetch recent ended calls for this system
all_calls = await fstore.collection_list("calls", system_id=system_id, status="ended")
if not all_calls:
return
# Random sample up to the token budget (4 chars ≈ 1 token)
random.shuffle(all_calls)
char_budget = settings.vocabulary_induction_sample_tokens * 4
transcript_block = ""
sampled = 0
for call in all_calls:
text = call.get("transcript_corrected") or call.get("transcript") or ""
if not text:
continue
if len(transcript_block) + len(text) > char_budget:
break
tg = call.get("talkgroup_name") or f"TGID {call.get('talkgroup_id', '?')}"
transcript_block += f"[{tg}] {text}\n"
sampled += 1
if sampled < 3:
return # not enough data to learn from yet
new_terms = await asyncio.to_thread(
_sync_induct, system_name, existing_vocab, transcript_block
)
if not new_terms:
return
now = datetime.now(timezone.utc).isoformat()
existing_pending: list[dict] = system_doc.get("vocabulary_pending") or []
pending_lower = {p["term"].lower() for p in existing_pending}
vocab_lower = {t.lower() for t in existing_vocab}
to_queue = [
{"term": t, "source": "induction", "added_at": now}
for t in new_terms
if t.lower() not in vocab_lower and t.lower() not in pending_lower
]
if not to_queue:
return
await fstore.doc_set("systems", system_id, {
"vocabulary_pending": existing_pending + to_queue,
})
logger.info(
f"Vocabulary induction: {len(to_queue)} new term(s) proposed for "
f"system {system_id} ({system_name}): {[p['term'] for p in to_queue]}"
)
# ─────────────────────────────────────────────────────────────────────────────
# Internal sync helpers
# ─────────────────────────────────────────────────────────────────────────────
_STOP_WORDS = {
"the", "and", "for", "are", "was", "were", "this", "that", "with",
"have", "has", "had", "but", "not", "from", "they", "will", "what",
"can", "all", "been", "one", "two", "three", "four", "five", "six",
"you", "out", "who", "get", "her", "him", "his", "its", "our", "my",
"via", "per", "any", "now", "got", "she", "let", "did", "may", "yes",
"sir", "say", "see", "too", "off", "how", "put", "set", "try", "back",
"just", "like", "into", "than", "them", "then", "some", "also", "onto",
"went", "over", "copy", "okay", "unit", "post", "road", "lane", "going",
"being", "doing", "there", "their", "about", "would", "could", "should",
"route", "north", "south", "east", "west", "avenue", "street", "drive",
}
def _diff_new_terms(original: str, corrected: str) -> list[str]:
"""
Token-level diff: find tokens in `corrected` that replaced or were inserted
relative to `original`. These are the admin's intended spellings — good
candidates for vocabulary.
"""
orig_tokens = original.split()
corr_tokens = corrected.split()
matcher = difflib.SequenceMatcher(None,
[t.lower() for t in orig_tokens],
[t.lower() for t in corr_tokens],
)
new_terms: list[str] = []
for tag, _i1, _i2, j1, j2 in matcher.get_opcodes():
if tag in ("insert", "replace"):
for tok in corr_tokens[j1:j2]:
clean = tok.strip(".,!?;:()'\"").strip("-")
if len(clean) >= 3 and clean.lower() not in _STOP_WORDS:
new_terms.append(clean)
return list(dict.fromkeys(new_terms))
def _sync_bootstrap(system_name: str, system_type: str, area_hint: str) -> list[str]:
from app.config import settings as cfg
from openai import OpenAI
if not cfg.openai_api_key:
return []
prompt = _BOOTSTRAP_PROMPT.format(
system_name=system_name,
system_type=system_type,
area_hint=area_hint,
)
try:
client = OpenAI(api_key=cfg.openai_api_key)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
data = json.loads(response.choices[0].message.content)
terms = data.get("vocabulary") or []
return [str(t).strip() for t in terms if str(t).strip()]
except Exception as e:
logger.warning(f"Vocabulary bootstrap GPT call failed: {e}")
return []
def _sync_induct(
system_name: str, existing_vocab: list[str], transcript_block: str
) -> list[str]:
from app.config import settings as cfg
from openai import OpenAI
if not cfg.openai_api_key:
return []
vocab_str = ", ".join(existing_vocab[:80]) if existing_vocab else "(none yet)"
prompt = _INDUCTION_PROMPT.format(
system_name=system_name,
existing_vocab=vocab_str,
transcript_block=transcript_block[:8000],
)
try:
client = OpenAI(api_key=cfg.openai_api_key)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
data = json.loads(response.choices[0].message.content)
terms = data.get("new_terms") or []
return [str(t).strip() for t in terms if str(t).strip()]
except Exception as e:
logger.warning(f"Vocabulary induction GPT call failed: {e}")
return []
+8 -2
View File
@@ -6,6 +6,8 @@ from app.internal.logger import logger
from app.internal.mqtt_handler import mqtt_handler
from app.internal.node_sweeper import sweeper_loop
from app.internal.summarizer import summarizer_loop
from app.internal.vocabulary_learner import vocabulary_induction_loop
from app.internal.recorrelation_sweep import recorrelation_loop
from app.config import settings
from app.internal.auth import require_firebase_token, require_service_or_firebase_token
from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts
@@ -35,14 +37,18 @@ async def lifespan(app: FastAPI):
await _release_orphaned_tokens()
await mqtt_handler.connect()
sweeper_task = asyncio.create_task(sweeper_loop())
summarizer_task = asyncio.create_task(summarizer_loop())
sweeper_task = asyncio.create_task(sweeper_loop())
summarizer_task = asyncio.create_task(summarizer_loop())
induction_task = asyncio.create_task(vocabulary_induction_loop())
recorrelation_task = asyncio.create_task(recorrelation_loop())
yield # --- app running ---
logger.info("DRB C2 Core shutting down.")
sweeper_task.cancel()
summarizer_task.cancel()
induction_task.cancel()
recorrelation_task.cancel()
await mqtt_handler.disconnect()
+7
View File
@@ -83,6 +83,13 @@ async def patch_transcript(
"embedding": None,
})
# Learn from the correction: diff original → corrected and add new tokens to vocabulary
system_id = call.get("system_id")
original_text = call.get("transcript_corrected") or call.get("transcript") or ""
if system_id and original_text:
from app.internal.vocabulary_learner import learn_from_correction
await learn_from_correction(system_id, original_text, body.transcript)
from app.routers.upload import _run_extraction_pipeline
background_tasks.add_task(
_run_extraction_pipeline,
+73
View File
@@ -1,11 +1,17 @@
import uuid
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional
from app.models import SystemCreate, SystemRecord
from app.internal import firestore as fstore
router = APIRouter(prefix="/systems", tags=["systems"])
class VocabularyTermBody(BaseModel):
term: str
@router.get("")
async def list_systems():
return await fstore.collection_list("systems")
@@ -42,3 +48,70 @@ async def delete_system(system_id: str):
if not existing:
raise HTTPException(404, f"System '{system_id}' not found.")
await fstore.doc_delete("systems", system_id)
# ── Vocabulary endpoints ───────────────────────────────────────────────────────
@router.get("/{system_id}/vocabulary")
async def get_vocabulary(system_id: str):
"""Return approved vocabulary and pending induction suggestions."""
existing = await fstore.doc_get("systems", system_id)
if not existing:
raise HTTPException(404, f"System '{system_id}' not found.")
from app.internal.vocabulary_learner import get_vocabulary as _get
return await _get(system_id)
@router.post("/{system_id}/vocabulary/bootstrap", status_code=202)
async def bootstrap_vocabulary(system_id: str):
"""Trigger a one-shot GPT-4o bootstrap to seed the vocabulary from local knowledge."""
existing = await fstore.doc_get("systems", system_id)
if not existing:
raise HTTPException(404, f"System '{system_id}' not found.")
from app.internal.vocabulary_learner import bootstrap_system_vocabulary
terms = await bootstrap_system_vocabulary(system_id)
return {"added": len(terms), "terms": terms}
@router.post("/{system_id}/vocabulary/terms")
async def add_vocabulary_term(system_id: str, body: VocabularyTermBody):
"""Manually add a term to the approved vocabulary."""
existing = await fstore.doc_get("systems", system_id)
if not existing:
raise HTTPException(404, f"System '{system_id}' not found.")
from app.internal.vocabulary_learner import add_term
await add_term(system_id, body.term.strip())
return {"ok": True}
@router.delete("/{system_id}/vocabulary/terms")
async def remove_vocabulary_term(system_id: str, body: VocabularyTermBody):
"""Remove a term from the approved vocabulary."""
existing = await fstore.doc_get("systems", system_id)
if not existing:
raise HTTPException(404, f"System '{system_id}' not found.")
from app.internal.vocabulary_learner import remove_term
await remove_term(system_id, body.term)
return {"ok": True}
@router.post("/{system_id}/vocabulary/pending/approve")
async def approve_pending(system_id: str, body: VocabularyTermBody):
"""Move a pending induction suggestion into the approved vocabulary."""
existing = await fstore.doc_get("systems", system_id)
if not existing:
raise HTTPException(404, f"System '{system_id}' not found.")
from app.internal.vocabulary_learner import approve_pending_term
await approve_pending_term(system_id, body.term)
return {"ok": True}
@router.post("/{system_id}/vocabulary/pending/dismiss")
async def dismiss_pending(system_id: str, body: VocabularyTermBody):
"""Dismiss a pending induction suggestion without adding it."""
existing = await fstore.doc_get("systems", system_id)
if not existing:
raise HTTPException(404, f"System '{system_id}' not found.")
from app.internal.vocabulary_learner import dismiss_pending_term
await dismiss_pending_term(system_id, body.term)
return {"ok": True}
+3 -1
View File
@@ -151,7 +151,9 @@ async def _run_intelligence_pipeline(
# Step 1: Transcription
if gcs_uri:
transcript, segments = await transcription.transcribe_call(call_id, gcs_uri, talkgroup_name)
transcript, segments = await transcription.transcribe_call(
call_id, gcs_uri, talkgroup_name, system_id=system_id
)
# Step 2: Intelligence extraction
tags: list[str] = []
+185 -1
View File
@@ -3,7 +3,7 @@
import { useState } from "react";
import { useSystems } from "@/lib/useSystems";
import { c2api } from "@/lib/c2api";
import type { SystemRecord } from "@/lib/types";
import type { SystemRecord, VocabularyPendingTerm } from "@/lib/types";
// ── P25 structured config types ──────────────────────────────────────────────
@@ -433,6 +433,189 @@ function SystemForm({
);
}
// ── Vocabulary panel ──────────────────────────────────────────────────────────
function VocabularyPanel({ systemId }: { systemId: string }) {
const [vocab, setVocab] = useState<string[] | null>(null);
const [pending, setPending] = useState<VocabularyPendingTerm[]>([]);
const [bootstrapped, setBootstrapped] = useState(false);
const [loading, setLoading] = useState(false);
const [bootstrapping, setBootstrapping] = useState(false);
const [newTerm, setNewTerm] = useState("");
const [adding, setAdding] = useState(false);
const [open, setOpen] = useState(false);
async function load() {
if (vocab !== null) return; // already loaded
setLoading(true);
try {
const data = await c2api.getVocabulary(systemId);
setVocab(data.vocabulary);
setPending(data.vocabulary_pending);
setBootstrapped(data.vocabulary_bootstrapped);
} finally {
setLoading(false);
}
}
function toggle() {
if (!open) load();
setOpen((v) => !v);
}
async function handleBootstrap() {
setBootstrapping(true);
try {
const result = await c2api.bootstrapVocabulary(systemId);
const data = await c2api.getVocabulary(systemId);
setVocab(data.vocabulary);
setPending(data.vocabulary_pending);
setBootstrapped(true);
alert(`Bootstrap added ${result.added} term(s).`);
} finally {
setBootstrapping(false);
}
}
async function handleAdd(e: React.FormEvent) {
e.preventDefault();
const term = newTerm.trim();
if (!term) return;
setAdding(true);
try {
await c2api.addVocabularyTerm(systemId, term);
setVocab((v) => (v ? [...v, term] : [term]));
setNewTerm("");
} finally {
setAdding(false);
}
}
async function handleRemove(term: string) {
await c2api.removeVocabularyTerm(systemId, term);
setVocab((v) => (v ?? []).filter((t) => t !== term));
}
async function handleApprove(term: string) {
await c2api.approvePendingTerm(systemId, term);
setVocab((v) => (v ? [...v, term] : [term]));
setPending((p) => p.filter((t) => t.term !== term));
}
async function handleDismiss(term: string) {
await c2api.dismissPendingTerm(systemId, term);
setPending((p) => p.filter((t) => t.term !== term));
}
return (
<div className="mt-3 border-t border-gray-800 pt-3">
<button
onClick={toggle}
className="text-xs text-gray-500 hover:text-gray-300 font-mono transition-colors flex items-center gap-1"
>
<span>{open ? "▲" : "▼"}</span>
<span>
Vocabulary
{vocab !== null && <span className="text-gray-600 ml-1">({vocab.length} terms{pending.length > 0 ? `, ${pending.length} pending` : ""})</span>}
</span>
</button>
{open && (
<div className="mt-3 space-y-3 font-mono text-xs">
{loading && <p className="text-gray-600 italic">Loading</p>}
{!loading && vocab !== null && (
<>
{/* Bootstrap button */}
<div className="flex items-center gap-3">
<button
onClick={handleBootstrap}
disabled={bootstrapping}
className="bg-indigo-800 hover:bg-indigo-700 disabled:opacity-50 text-indigo-200 px-3 py-1.5 rounded-lg text-xs transition-colors"
>
{bootstrapping ? "Bootstrapping…" : bootstrapped ? "Re-bootstrap with AI" : "Bootstrap with AI"}
</button>
<span className="text-gray-600">GPT-4o generates local knowledge (agencies, units, streets)</span>
</div>
{/* Approved vocabulary chips */}
<div>
<p className="text-gray-500 uppercase tracking-wider mb-1.5">Approved ({vocab.length})</p>
{vocab.length > 0 ? (
<div className="flex flex-wrap gap-1.5">
{vocab.map((term) => (
<span
key={term}
className="inline-flex items-center gap-1 bg-gray-800 text-gray-300 px-2 py-0.5 rounded-full"
>
{term}
<button
onClick={() => handleRemove(term)}
className="text-gray-600 hover:text-red-400 transition-colors leading-none"
>
×
</button>
</span>
))}
</div>
) : (
<p className="text-gray-600 italic">No terms yet bootstrap or add manually.</p>
)}
</div>
{/* Add term */}
<form onSubmit={handleAdd} className="flex gap-2">
<input
value={newTerm}
onChange={(e) => setNewTerm(e.target.value)}
placeholder="Add term (e.g. 5-baker, YVAC)"
className="flex-1 bg-gray-800 border border-gray-700 rounded-lg px-3 py-1.5 text-white text-xs font-mono focus:outline-none focus:border-indigo-500"
/>
<button
type="submit"
disabled={adding || !newTerm.trim()}
className="bg-gray-700 hover:bg-gray-600 disabled:opacity-50 text-gray-200 px-3 py-1.5 rounded-lg transition-colors"
>
Add
</button>
</form>
{/* Pending induction suggestions */}
{pending.length > 0 && (
<div>
<p className="text-gray-500 uppercase tracking-wider mb-1.5">
Induction suggestions ({pending.length})
</p>
<div className="space-y-1">
{pending.map((p) => (
<div key={p.term} className="flex items-center gap-2">
<span className="text-gray-300 flex-1">{p.term}</span>
<span className="text-gray-600">{p.source}</span>
<button
onClick={() => handleApprove(p.term)}
className="text-green-500 hover:text-green-400 transition-colors px-1"
>
</button>
<button
onClick={() => handleDismiss(p.term)}
className="text-gray-600 hover:text-red-400 transition-colors px-1"
>
</button>
</div>
))}
</div>
</div>
)}
</>
)}
</div>
)}
</div>
);
}
// ── Systems list page ─────────────────────────────────────────────────────────
export default function SystemsPage() {
@@ -509,6 +692,7 @@ export default function SystemsPage() {
Delete
</button>
</div>
<VocabularyPanel systemId={s.system_id} />
</div>
);
})}
+16
View File
@@ -93,4 +93,20 @@ export const c2api = {
// Node key management
reissueNodeKey: (nodeId: string) =>
request(`/nodes/${nodeId}/reissue-key`, { method: "POST" }),
// Vocabulary
getVocabulary: (systemId: string) =>
request<{ vocabulary: string[]; vocabulary_pending: { term: string; source: string; added_at: string }[]; vocabulary_bootstrapped: boolean }>(
`/systems/${systemId}/vocabulary`
),
bootstrapVocabulary: (systemId: string) =>
request<{ added: number; terms: string[] }>(`/systems/${systemId}/vocabulary/bootstrap`, { method: "POST" }),
addVocabularyTerm: (systemId: string, term: string) =>
request(`/systems/${systemId}/vocabulary/terms`, { method: "POST", body: JSON.stringify({ term }) }),
removeVocabularyTerm: (systemId: string, term: string) =>
request(`/systems/${systemId}/vocabulary/terms`, { method: "DELETE", body: JSON.stringify({ term }) }),
approvePendingTerm: (systemId: string, term: string) =>
request(`/systems/${systemId}/vocabulary/pending/approve`, { method: "POST", body: JSON.stringify({ term }) }),
dismissPendingTerm: (systemId: string, term: string) =>
request(`/systems/${systemId}/vocabulary/pending/dismiss`, { method: "POST", body: JSON.stringify({ term }) }),
};
+9
View File
@@ -13,11 +13,20 @@ export interface NodeRecord {
approval_status: ApprovalStatus | null;
}
export interface VocabularyPendingTerm {
term: string;
source: "induction" | "correction";
added_at: string;
}
export interface SystemRecord {
system_id: string;
name: string;
type: string; // P25 | DMR | NBFM
config: Record<string, unknown>;
vocabulary?: string[];
vocabulary_pending?: VocabularyPendingTerm[];
vocabulary_bootstrapped?: boolean;
}
export interface TranscriptSegment {