diff --git a/drb-c2-core/app/config.py b/drb-c2-core/app/config.py index 1bd5ada..8d0d5d8 100644 --- a/drb-c2-core/app/config.py +++ b/drb-c2-core/app/config.py @@ -24,7 +24,8 @@ class Settings(BaseSettings): gemini_api_key: Optional[str] = None summary_interval_minutes: int = 2 # how often the summary loop runs correlation_window_hours: int = 2 # slow/location path: max hours since last call - embedding_similarity_threshold: float = 0.93 # slow-path cosine threshold (tiebreaker only) + embedding_similarity_threshold: float = 0.93 # slow-path: requires location corroboration + embedding_no_location_threshold: float = 0.97 # slow-path: match without location (very high bar) location_proximity_km: float = 0.5 # radius for location-proximity matching incident_auto_resolve_minutes: int = 90 # auto-resolve after N minutes with no new calls recorrelation_scan_minutes: int = 15 # re-examine orphaned calls ended within this window diff --git a/drb-c2-core/app/internal/incident_correlator.py b/drb-c2-core/app/internal/incident_correlator.py index 6164ed7..bbc8899 100644 --- a/drb-c2-core/app/internal/incident_correlator.py +++ b/drb-c2-core/app/internal/incident_correlator.py @@ -79,7 +79,25 @@ async def correlate_call( matched_incident: Optional[dict] = None + # A "thin" call carries no scene-identifying information — it is a pure + # status transmission (10-4, en route, acknowledgement). Detected by the + # absence of extracted units, vehicles, AND geocoded coordinates. Thin + # calls should link to wherever the last active conversation on this TGID + # was happening rather than running the full scene-verification logic. + is_thin_call = not call_units and not call_vehicles and not coords + # ── 1. Fast path: talkgroup match (any type, no time limit) ────────────── + # + # Two distinct behaviours depending on call substance: + # + # • Thin call → link to the most-recently-updated active incident on this + # TGID (i.e. the last conversation in progress). + # + # • Substantive call (has location / units / vehicles) → verify the call + # actually belongs to the matched incident before linking. When a busy + # dispatch channel runs multiple concurrent scenes (different addresses, + # different units) we split them into separate incidents rather than + # merging everything because they share a talkgroup. if talkgroup_id is not None and system_id: tg_str = str(talkgroup_id) tg_matches = [ @@ -87,11 +105,30 @@ async def correlate_call( if system_id in (inc.get("system_ids") or []) and tg_str in (inc.get("talkgroup_ids") or []) ] - if len(tg_matches) == 1: - matched_incident = tg_matches[0] + + if tg_matches and is_thin_call: + # Status/ack call — no scene data to reason about. + # Attach to whichever incident was most recently active on this TGID. + matched_incident = max(tg_matches, key=lambda inc: inc.get("updated_at", "")) logger.info( - f"Correlator fast-path: call {call_id} → {tg_matches[0]['incident_id']}" + f"Correlator fast-path (thin→last TGID incident): " + f"call {call_id} → {matched_incident['incident_id']}" ) + elif len(tg_matches) == 1: + candidate = tg_matches[0] + if _call_fits_incident( + candidate, call_units, call_vehicles, coords, settings.location_proximity_km + ): + matched_incident = candidate + logger.info( + f"Correlator fast-path: call {call_id} → {candidate['incident_id']}" + ) + else: + logger.info( + f"Correlator fast-path skipped: call {call_id} — different scene " + f"from {candidate['incident_id']} (no unit overlap + distant location); " + f"will attempt new incident" + ) elif len(tg_matches) > 1: matched_incident = _disambiguate( tg_matches, call_units, call_vehicles, coords, call_embedding @@ -119,7 +156,13 @@ async def correlate_call( ) break - # ── 3. Slow path: embedding + location corroboration (time-limited, same type) ── + # ── 3. Slow path: embedding similarity (time-limited, same type) ────────── + # + # Two tiers: + # ① embedding_similarity_threshold + location corroboration (standard) + # ② embedding_no_location_threshold alone — when geocoding failed on + # either side but the transcript content is semantically very close. + # A strong embedding match beats a missing geocode. if not matched_incident and call_embedding and incident_type: best_score = 0.0 best_inc: Optional[dict] = None @@ -147,7 +190,14 @@ async def correlate_call( f"Correlator slow-path: call {call_id} → {best_inc['incident_id']} " f"(sim={best_score:.3f}, dist={dist_km:.2f}km)" ) - # No coords available → slow path alone is not enough; skip + elif best_score >= settings.embedding_no_location_threshold: + # High-confidence semantic match; geocode unavailable on one or + # both sides — content similarity alone is sufficient evidence. + matched_incident = best_inc + logger.info( + f"Correlator slow-path (high-confidence, no location): " + f"call {call_id} → {best_inc['incident_id']} (sim={best_score:.3f})" + ) # ── Update existing or create new ──────────────────────────────────────── if matched_incident: @@ -167,7 +217,6 @@ async def correlate_call( # No match and either no type or creation suppressed — nothing to do return None - await fstore.doc_set("calls", call_id, {"incident_id": incident_id}) return incident_id @@ -254,6 +303,62 @@ def _disambiguate( return best +def _call_fits_incident( + inc: dict, + call_units: list[str], + call_vehicles: list[str], + call_coords: Optional[dict], + proximity_km: float, +) -> bool: + """ + Return True if this call plausibly belongs to the given incident. + + This guards the single-talkgroup-match fast path on busy dispatch channels + where multiple concurrent scenes share one talkgroup. We only return False + (→ create a new incident) when there is *positive evidence* of a different + scene: a geocoded location that is too far away AND no unit or vehicle + overlap. In all ambiguous cases we default to True (link) to avoid + fragmenting short status calls that carry no location or unit information. + + Examples that correctly split: + - Police dispatch sends units to two separate MVAs miles apart + - EMS handles overlapping aided cases at different addresses + + Examples that correctly stay together (domestic with split parties): + - Units at 10 Main St and 12 Main St — within proximity radius → True + - Same unit mentioned in both the call and the incident → True + """ + # Unit overlap is the strongest positive signal: same officers = same call. + inc_units = set(inc.get("units") or []) + if inc_units and call_units and any(u in inc_units for u in call_units): + return True + + # Vehicle overlap: same vehicle description across calls → same scene. + inc_vehicles = set(inc.get("vehicles") or []) + if inc_vehicles and call_vehicles and any(v in inc_vehicles for v in call_vehicles): + return True + + # When both sides have geocoded coordinates, distance is the tiebreaker. + inc_coords = inc.get("location_coords") + if call_coords and inc_coords: + dist_km = _haversine_km( + call_coords["lat"], call_coords["lng"], + inc_coords["lat"], inc_coords["lng"], + ) + # Within proximity radius → same scene (handles domestics with nearby split parties). + if dist_km <= proximity_km: + return True + # Different location AND no unit/vehicle overlap → different incident. + return False + + # No geocoded location on one or both sides but the call IS substantive + # (has units or vehicles — otherwise is_thin_call would have caught it). + # Unit overlap already returned True above if present. If we reach here + # there is no overlap and no coords to compare — we cannot prove it is a + # different scene, so default to linking rather than fragmenting. + return True + + async def _update_incident( inc: dict, call_id: str, @@ -313,6 +418,11 @@ async def _update_incident( if best_coords: updates["location_coords"] = best_coords + # Update incident type when a re-classified call provides a concrete type. + # This handles the case where admin correction changes fire→police, etc. + if incident_type and incident_type != inc.get("type"): + updates["type"] = incident_type + # Re-evaluate title when a substantive call (classified incident_type) brings new tags. # Routine status calls (type=None) do not clobber the title. if incident_type: diff --git a/drb-c2-core/app/internal/intelligence.py b/drb-c2-core/app/internal/intelligence.py index 66bdeb9..f674fe6 100644 --- a/drb-c2-core/app/internal/intelligence.py +++ b/drb-c2-core/app/internal/intelligence.py @@ -1,8 +1,10 @@ """ GPT-4o-mini intelligence extraction from call transcripts. -Sends the transcript to GPT-4o mini with a tight JSON schema prompt. -Returns structured data: incident type, tags, location, vehicles, units, severity. +Sends the transcript to GPT-4o-mini with a structured prompt that detects +whether the recording contains one or multiple distinct scenes (back-to-back +dispatch conversations on a busy channel). Returns a list of scene dicts — +one per detected incident. Most calls produce a single scene. Falls back gracefully if the API is unavailable or returns malformed output. """ @@ -13,30 +15,37 @@ from typing import Optional from app.internal.logger import logger from app.internal import firestore as fstore -_PROMPT_TEMPLATE = """You are analyzing a P25 public safety radio recording. The audio was transcribed by Whisper through a digital radio vocoder, which introduces errors. Each numbered transmission is a separate PTT press from a different radio. Extract structured information and respond ONLY with a single valid JSON object — no markdown, no explanation. +_PROMPT_TEMPLATE = """You are analyzing a P25 public safety radio recording. The audio was transcribed by Whisper through a digital radio vocoder, which introduces errors. Each numbered transmission is a separate PTT press from a different radio. -Schema: -{{ - "incident_type": one of "fire" | "ems" | "police" | "accident" | "other" | "unknown", - "tags": [list of specific descriptive tags, max 6, e.g. "two-car mva", "property-damage-only", "working fire", "shots-fired"], - "location": "most specific location string found, or empty string", - "vehicles": [vehicle descriptions mentioned, e.g. "Hyundai Tucson", "black sedan"], - "units": [unit IDs or officer numbers mentioned, e.g. "Unit 511", "Car 4"], - "severity": one of "minor" | "moderate" | "major" | "unknown", - "resolved": true if this call explicitly signals the incident is over ("Code 4", "in custody", "all clear", "fire out", "patient transported", "GOA", "scene clear", "10-42", "negative contact", "clear the scene"), false otherwise, - "transcript_corrected": "corrected full transcript string, or null if no corrections needed" -}} +SCENE DETECTION: +A busy dispatch channel sometimes captures back-to-back conversations about multiple concurrent incidents in a single recording. Detect whether this recording contains ONE scene (all transmissions relate to a single event) or MULTIPLE scenes (clearly distinct dispatch conversations with different units being assigned, different locations, different event types). Assign short status transmissions (10-4, en route, acknowledgements) with no clear scene context to the most recent scene before them in the list. + +Always respond with the scenes array, even for a single scene. + +Response format — a JSON object with a "scenes" array. Each scene: + segment_indices: list of 0-based indices into the numbered transmissions (or null if no segments) + incident_type: one of "fire" | "ems" | "police" | "accident" | "other" | "unknown" + tags: list of specific descriptive tags, max 6, e.g. "two-car mva", "working fire", "shots-fired" + location: most specific location string found, or empty string + vehicles: list of vehicle descriptions mentioned + units: list of unit IDs or officer numbers explicitly mentioned + severity: one of "minor" | "moderate" | "major" | "unknown" + resolved: true if this scene explicitly signals incident closure, false otherwise + transcript_corrected: corrected text for this scene's transmissions only, or null Rules: - location: prefer intersections > addresses > mile markers > route+town > route alone > town alone. Empty string if none. -- tags: be specific and lowercase, hyphenated. Do not repeat incident_type as a tag. +- tags: specific, lowercase, hyphenated. Do not repeat incident_type as a tag. - units: only identifiers explicitly mentioned, not inferred. - Do not invent details not present in the transcript. -- transcript_corrected: fix only clear STT errors caused by vocoder distortion (e.g. "Several" → "10-4", misheard street names, garbled unit IDs). Use the back-and-forth context between transmissions to resolve ambiguities. Keep all radio language as-is — do NOT decode codes into plain English. Return null if the transcript looks accurate. +- incident_type: let the talkgroup channel be your primary signal. Use "fire" ONLY if the talkgroup is clearly a fire/rescue channel OR the transcript explicitly describes active fire, smoke, flames, or structure fire activation. Police or EMS referencing a fire scene → use "police" or "ems". When uncertain, prefer "other" over "fire". +- ten_codes: interpret radio codes using the department reference provided below. Do not guess codes not listed. +- resolved: true only when the scene explicitly signals "Code 4", "all clear", "10-42", "in custody", "patient transported", "fire out", "GOA", "negative contact", "scene clear". +- transcript_corrected: fix only clear STT/vocoder errors (e.g. "Several" → "10-4", misheard street names, garbled unit IDs). Keep all radio language as-is — do NOT decode codes into plain English. Return null if accurate. System: {system_id} Talkgroup: {talkgroup_name} -{vocabulary_block}{transcript_block}""" +{ten_codes_block}{vocabulary_block}{transcript_block}""" # Nominatim viewbox half-width in degrees (~11 km at mid-latitudes) _GEO_DELTA = 0.1 @@ -54,7 +63,14 @@ _TG_SUFFIX_RE = re.compile( ) -async def extract_tags( +def _build_ten_codes_block(ten_codes: dict[str, str]) -> str: + if not ten_codes: + return "" + lines = "\n".join(f" {code}: {meaning}" for code, meaning in sorted(ten_codes.items())) + return f"Department ten-codes:\n{lines}\n\n" + + +async def extract_scenes( call_id: str, transcript: str, talkgroup_name: Optional[str] = None, @@ -63,84 +79,128 @@ async def extract_tags( segments: Optional[list[dict]] = None, node_id: Optional[str] = None, preserve_transcript_correction: bool = False, -) -> tuple[list[str], Optional[str], Optional[str], Optional[dict], bool]: +) -> list[dict]: """ - Extract incident tags, type, location, corrected transcript, and closure signal via GPT-4o mini. - Geocodes the extracted location string via Nominatim using the node's position as bias. + Split the transcript into one or more scenes and extract structured + intelligence for each. Most calls return a single scene; a busy dispatch + channel capturing back-to-back conversations returns multiple. - Returns: - (tags, primary_type, location_str, location_coords, resolved) - where location_coords is {"lat": float, "lng": float} or None, - and resolved is True when the transcript signals incident closure. + Each scene dict contains: + tags, incident_type, location, location_coords, resolved, + severity, vehicles, units, transcript_corrected, + segment_indices, embedding - Side-effect: updates calls/{call_id} in Firestore with tags, location, - location_coords, vehicles, units, severity, transcript_corrected; also stores embedding. + Side-effect: updates calls/{call_id} in Firestore with merged tags, + location (primary scene), units/vehicles, severity, embedding, and + optionally transcript_corrected. """ - # Load per-system vocabulary for prompt injection vocabulary: list[str] = [] + ten_codes: dict[str, str] = {} if system_id: from app.internal.vocabulary_learner import get_vocabulary vocab_data = await get_vocabulary(system_id) vocabulary = vocab_data.get("vocabulary") or [] + system_doc = await fstore.doc_get("systems", system_id) + if system_doc: + ten_codes = system_doc.get("ten_codes") or {} - result = await asyncio.to_thread( - _sync_extract, transcript, talkgroup_name, talkgroup_id, system_id, segments, vocabulary + raw_scenes: list[dict] = await asyncio.to_thread( + _sync_extract, + transcript, talkgroup_name, talkgroup_id, system_id, segments, vocabulary, ten_codes, ) - tags: list[str] = result.get("tags") or [] - incident_type: Optional[str] = result.get("incident_type") or None - location: Optional[str] = result.get("location") or None - vehicles: list[str] = result.get("vehicles") or [] - units: list[str] = result.get("units") or [] - severity: str = result.get("severity") or "unknown" - resolved: bool = bool(result.get("resolved", False)) - transcript_corrected: Optional[str] = result.get("transcript_corrected") or None + if not raw_scenes: + return [] - if incident_type in ("unknown", "other", ""): - incident_type = None - - # Geocode the location string if we have one and a node to bias toward - location_coords: Optional[dict] = None - if location and node_id: + # Resolve node position once for geocoding all scenes + node_lat: Optional[float] = None + node_lon: Optional[float] = None + if node_id: node_doc = await fstore.doc_get("nodes", node_id) if node_doc: node_lat = node_doc.get("lat") node_lon = node_doc.get("lon") - if node_lat is not None and node_lon is not None: - state = await _get_node_state(node_id, node_lat, node_lon) - muni = _municipality_from_tg(talkgroup_name) - hint_parts = [p for p in [muni, state] if p] - query = f"{location}, {', '.join(hint_parts)}" if hint_parts else location - location_coords = await _geocode_location(query, node_lat, node_lon) - # Store embedding alongside structured data - embedding = await asyncio.to_thread(_sync_embed, _embed_text(transcript, incident_type)) + processed: list[dict] = [] + for scene in raw_scenes: + tags: list[str] = scene.get("tags") or [] + incident_type: Optional[str] = scene.get("incident_type") or None + location: Optional[str] = scene.get("location") or None + vehicles: list[str] = scene.get("vehicles") or [] + units: list[str] = scene.get("units") or [] + severity: str = scene.get("severity") or "unknown" + resolved: bool = bool(scene.get("resolved", False)) + transcript_corrected: Optional[str]= scene.get("transcript_corrected") or None + segment_indices: Optional[list] = scene.get("segment_indices") - updates: dict = {"tags": tags, "severity": severity} - if location: - updates["location"] = location - if location_coords: - updates["location_coords"] = location_coords - if vehicles: - updates["vehicles"] = vehicles - if units: - updates["units"] = units - if embedding: - updates["embedding"] = embedding - if transcript_corrected and not preserve_transcript_correction: - updates["transcript_corrected"] = transcript_corrected + if incident_type in ("unknown", "other", ""): + incident_type = None + + # Geocode this scene's location + location_coords: Optional[dict] = None + if location and node_lat is not None and node_lon is not None: + state = await _get_node_state(node_id, node_lat, node_lon) + muni = _municipality_from_tg(talkgroup_name) + hint_parts = [p for p in [muni, state] if p] + query = f"{location}, {', '.join(hint_parts)}" if hint_parts else location + location_coords = await _geocode_location(query, node_lat, node_lon) + + # Embed this scene's content + scene_text = _build_scene_embed_text( + transcript, segments, segment_indices, incident_type, transcript_corrected + ) + embedding = await asyncio.to_thread(_sync_embed, scene_text) + + processed.append({ + "tags": tags, + "incident_type": incident_type, + "location": location, + "location_coords": location_coords, + "vehicles": vehicles, + "units": units, + "severity": severity, + "resolved": resolved, + "transcript_corrected": transcript_corrected, + "segment_indices": segment_indices, + "embedding": embedding, + }) + + # Merge across scenes for the call-level Firestore document. + # Primary scene (first) owns location, severity, transcript_corrected. + # Tags/units/vehicles are union-merged from all scenes. + primary = processed[0] + all_tags = list(dict.fromkeys(t for s in processed for t in s["tags"])) + all_units = list(dict.fromkeys(u for s in processed for u in s["units"])) + all_vehicles = list(dict.fromkeys(v for s in processed for v in s["vehicles"])) + + updates: dict = {"tags": all_tags, "severity": primary["severity"]} + if primary["location"]: + updates["location"] = primary["location"] + if primary["location_coords"]: + updates["location_coords"] = primary["location_coords"] + if all_units: + updates["units"] = all_units + if all_vehicles: + updates["vehicles"] = all_vehicles + if primary["embedding"]: + updates["embedding"] = primary["embedding"] + if primary["transcript_corrected"] and not preserve_transcript_correction: + updates["transcript_corrected"] = primary["transcript_corrected"] try: await fstore.doc_set("calls", call_id, updates) except Exception as e: logger.warning(f"Could not save intelligence for call {call_id}: {e}") - logger.info( - f"Intelligence: call {call_id} → type={incident_type}, " - f"tags={tags}, location={location!r}, coords={location_coords}, severity={severity}, " - f"corrected={transcript_corrected is not None}" + scene_summary = ( + f"{len(processed)} scene(s): " + + ", ".join( + f"[{s['incident_type'] or 'unclassified'} tags={s['tags'][:2]}]" + for s in processed + ) ) - return tags, incident_type, location, location_coords, resolved + logger.info(f"Intelligence: call {call_id} → {scene_summary}") + return processed async def _geocode_location( @@ -220,7 +280,6 @@ def _municipality_from_tg(tg_name: Optional[str]) -> Optional[str]: if not tg_name: return None cleaned = _TG_SUFFIX_RE.sub("", tg_name).strip() - # Discard if nothing left, purely numeric, or a short all-caps abbreviation (e.g. "WC", "TAC") if not cleaned or cleaned.isdigit() or (len(cleaned) <= 3 and cleaned.isupper()): return None return cleaned @@ -234,6 +293,23 @@ def _build_transcript_block(transcript: str, segments: Optional[list[dict]]) -> return f"Transcript:\n{transcript}" +def _build_scene_embed_text( + transcript: str, + segments: Optional[list[dict]], + segment_indices: Optional[list[int]], + incident_type: Optional[str], + transcript_corrected: Optional[str], +) -> str: + """Build the text string to embed for a specific scene.""" + prefix = f"[{incident_type}] " if incident_type else "" + if transcript_corrected: + return f"{prefix}{transcript_corrected}" + if segments and segment_indices: + texts = [segments[i]["text"] for i in segment_indices if i < len(segments)] + return f"{prefix}{' '.join(texts)}" + return f"{prefix}{transcript}" + + def _sync_extract( transcript: str, talkgroup_name: Optional[str], @@ -241,14 +317,15 @@ def _sync_extract( system_id: Optional[str], segments: Optional[list[dict]], vocabulary: Optional[list[str]] = None, -) -> dict: - """Call GPT-4o mini and parse the JSON response.""" + ten_codes: Optional[dict[str, str]] = None, +) -> list[dict]: + """Call GPT-4o-mini and return a list of scene dicts.""" from app.config import settings from openai import OpenAI if not settings.openai_api_key: logger.warning("OPENAI_API_KEY not set — intelligence extraction disabled.") - return {} + return [] from app.internal.vocabulary_learner import build_gpt_vocab_block tg = f"{talkgroup_name} (TGID {talkgroup_id})" if talkgroup_id else (talkgroup_name or "unknown") @@ -256,6 +333,7 @@ def _sync_extract( transcript_block=_build_transcript_block(transcript, segments), talkgroup_name=tg, system_id=system_id or "unknown", + ten_codes_block=_build_ten_codes_block(ten_codes or {}), vocabulary_block=build_gpt_vocab_block(vocabulary or []), ) @@ -266,13 +344,22 @@ def _sync_extract( messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"}, ) - return json.loads(response.choices[0].message.content) + raw = json.loads(response.choices[0].message.content) + + # New format: {"scenes": [...]} + if "scenes" in raw and isinstance(raw["scenes"], list): + return raw["scenes"] + + # Fallback: GPT returned the old flat single-scene format + logger.warning("GPT returned flat format instead of scenes array — wrapping") + return [raw] + except json.JSONDecodeError as e: - logger.warning(f"GPT-4o mini returned non-JSON: {e}") - return {} + logger.warning(f"GPT-4o-mini returned non-JSON: {e}") + return [] except Exception as e: - logger.warning(f"GPT-4o mini extraction failed: {e}") - return {} + logger.warning(f"GPT-4o-mini extraction failed: {e}") + return [] def _sync_embed(text: str) -> Optional[list[float]]: @@ -290,8 +377,3 @@ def _sync_embed(text: str) -> Optional[list[float]]: except Exception as e: logger.warning(f"Embedding generation failed: {e}") return None - - -def _embed_text(transcript: str, incident_type: Optional[str]) -> str: - prefix = f"[{incident_type}] " if incident_type else "" - return f"{prefix}{transcript}" diff --git a/drb-c2-core/app/internal/recorrelation_sweep.py b/drb-c2-core/app/internal/recorrelation_sweep.py index 25ad8e9..6d345d7 100644 --- a/drb-c2-core/app/internal/recorrelation_sweep.py +++ b/drb-c2-core/app/internal/recorrelation_sweep.py @@ -46,7 +46,10 @@ async def _run_sweep_pass() -> None: ("status", "==", "ended"), ("ended_at", ">=", cutoff), ]) - orphans = [c for c in recent_ended if not c.get("incident_id")] + orphans = [ + c for c in recent_ended + if not c.get("incident_ids") and not c.get("incident_id") + ] if not orphans: return @@ -89,6 +92,7 @@ async def _recorrelate_orphan(call: dict) -> bool: ) if incident_id: + await fstore.doc_set("calls", call_id, {"incident_ids": [incident_id]}) logger.info( f"Re-correlation: linked orphaned call {call_id} → incident {incident_id}" ) diff --git a/drb-c2-core/app/models.py b/drb-c2-core/app/models.py index b2b72e8..49d8079 100644 --- a/drb-c2-core/app/models.py +++ b/drb-c2-core/app/models.py @@ -33,12 +33,14 @@ class SystemRecord(BaseModel): name: str type: str # P25 / DMR / NBFM config: Dict[str, Any] = {} # OP25-compatible config blob + ten_codes: Dict[str, str] = {} # {"10-10": "Commercial Alarm", ...} class SystemCreate(BaseModel): name: str type: str config: Dict[str, Any] = {} + ten_codes: Dict[str, str] = {} # --------------------------------------------------------------------------- @@ -56,11 +58,11 @@ class CallRecord(BaseModel): started_at: datetime ended_at: Optional[datetime] = None audio_url: Optional[str] = None - transcript: Optional[str] = None # populated later by STT - incident_id: Optional[str] = None # populated later by intelligence layer + transcript: Optional[str] = None # populated later by STT + incident_ids: List[str] = [] # one per scene detected in the recording location: Optional[Dict[str, float]] = None # {lat, lng} tags: List[str] = [] - status: str = "active" # active / ended + status: str = "active" # active / ended # --------------------------------------------------------------------------- diff --git a/drb-c2-core/app/routers/calls.py b/drb-c2-core/app/routers/calls.py index 4c90e09..1fb3c74 100644 --- a/drb-c2-core/app/routers/calls.py +++ b/drb-c2-core/app/routers/calls.py @@ -83,6 +83,28 @@ async def patch_transcript( "embedding": None, }) + # Unlink from ALL current incidents so re-correlation starts clean. + # Handles both old single incident_id and new incident_ids list. + old_ids: list[str] = call.get("incident_ids") or ( + [call["incident_id"]] if call.get("incident_id") else [] + ) + for old_incident_id in old_ids: + old_incident = await fstore.doc_get("incidents", old_incident_id) + if old_incident: + remaining = [c for c in (old_incident.get("call_ids") or []) if c != call_id] + if remaining: + await fstore.doc_set("incidents", old_incident_id, { + "call_ids": remaining, + "summary_stale": True, + }) + else: + await fstore.doc_set("incidents", old_incident_id, { + "call_ids": [], + "status": "resolved", + "summary_stale": True, + }) + await fstore.doc_set("calls", call_id, {"incident_ids": [], "incident_id": None}) + # Learn from the correction: diff original → corrected and add new tokens to vocabulary system_id = call.get("system_id") original_text = call.get("transcript_corrected") or call.get("transcript") or "" diff --git a/drb-c2-core/app/routers/systems.py b/drb-c2-core/app/routers/systems.py index d99182b..23673f1 100644 --- a/drb-c2-core/app/routers/systems.py +++ b/drb-c2-core/app/routers/systems.py @@ -1,7 +1,7 @@ import uuid from fastapi import APIRouter, HTTPException from pydantic import BaseModel -from typing import Optional +from typing import Dict, Optional from app.models import SystemCreate, SystemRecord from app.internal import firestore as fstore @@ -12,6 +12,10 @@ class VocabularyTermBody(BaseModel): term: str +class TenCodesBody(BaseModel): + ten_codes: Dict[str, str] + + @router.get("") async def list_systems(): return await fstore.collection_list("systems") @@ -50,6 +54,27 @@ async def delete_system(system_id: str): await fstore.doc_delete("systems", system_id) +# ── Ten-codes endpoints ──────────────────────────────────────────────────────── + +@router.get("/{system_id}/ten-codes") +async def get_ten_codes(system_id: str): + """Return the ten-code dictionary for a system.""" + system = await fstore.doc_get("systems", system_id) + if not system: + raise HTTPException(404, f"System '{system_id}' not found.") + return {"ten_codes": system.get("ten_codes") or {}} + + +@router.put("/{system_id}/ten-codes") +async def update_ten_codes(system_id: str, body: TenCodesBody): + """Replace the ten-code dictionary for a system.""" + existing = await fstore.doc_get("systems", system_id) + if not existing: + raise HTTPException(404, f"System '{system_id}' not found.") + await fstore.doc_update("systems", system_id, {"ten_codes": body.ten_codes}) + return {"ok": True, "ten_codes": body.ten_codes} + + # ── Vocabulary endpoints ─────────────────────────────────────────────────────── @router.get("/{system_id}/vocabulary") diff --git a/drb-c2-core/app/routers/upload.py b/drb-c2-core/app/routers/upload.py index ee10f27..790e727 100644 --- a/drb-c2-core/app/routers/upload.py +++ b/drb-c2-core/app/routers/upload.py @@ -96,35 +96,47 @@ async def _run_extraction_pipeline( """Run steps 2-4 of the intelligence pipeline using an existing transcript.""" from app.internal import intelligence, incident_correlator, alerter - tags, incident_type, location, location_coords, resolved = await intelligence.extract_tags( + # Step 2: Scene detection + intelligence extraction. + # Returns one scene per distinct incident detected in the recording. + scenes = await intelligence.extract_scenes( call_id, transcript, talkgroup_name, talkgroup_id=talkgroup_id, system_id=system_id, segments=segments, node_id=node_id, preserve_transcript_correction=preserve_transcript_correction, ) - incident_id = await incident_correlator.correlate_call( - call_id=call_id, - node_id=node_id, - system_id=system_id, - talkgroup_id=talkgroup_id, - talkgroup_name=talkgroup_name, - tags=tags, - incident_type=incident_type, - location=location, - location_coords=location_coords, - ) + # Step 3: Correlate each scene to an incident independently. + incident_ids: list[str] = [] + all_tags: list[str] = [] + for scene in scenes: + all_tags.extend(scene["tags"]) + incident_id = await incident_correlator.correlate_call( + call_id=call_id, + node_id=node_id, + system_id=system_id, + talkgroup_id=talkgroup_id, + talkgroup_name=talkgroup_name, + tags=scene["tags"], + incident_type=scene["incident_type"], + location=scene["location"], + location_coords=scene["location_coords"], + ) + if incident_id and incident_id not in incident_ids: + incident_ids.append(incident_id) + if scene["resolved"] and incident_id: + await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) + logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") - if resolved and incident_id: - await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) - logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") + if incident_ids: + await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids}) + # Step 4: Alert dispatch — run once with merged tags from all scenes. await alerter.check_and_dispatch( call_id=call_id, node_id=node_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, - tags=tags, + tags=list(dict.fromkeys(all_tags)), transcript=transcript, ) @@ -140,8 +152,8 @@ async def _run_intelligence_pipeline( """ Post-upload intelligence pipeline (runs as a background task): 1. Transcribe audio via Google STT - 2. Extract tags/incident type from transcript - 3. Correlate with existing incidents (or create new one) + 2. Detect scenes + extract intelligence (one result per incident in recording) + 3. Correlate each scene with existing incidents (or create new ones) 4. Check alert rules and dispatch notifications """ from app.internal import transcription, intelligence, incident_correlator, alerter @@ -155,35 +167,57 @@ async def _run_intelligence_pipeline( call_id, gcs_uri, talkgroup_name, system_id=system_id ) - # Step 2: Intelligence extraction - tags: list[str] = [] - incident_type: Optional[str] = None - location: Optional[str] = None - location_coords: Optional[dict] = None - resolved: bool = False + # Step 2: Scene detection + intelligence extraction + scenes: list[dict] = [] if transcript: - tags, incident_type, location, location_coords, resolved = await intelligence.extract_tags( + scenes = await intelligence.extract_scenes( call_id, transcript, talkgroup_name, talkgroup_id=talkgroup_id, system_id=system_id, segments=segments, node_id=node_id, ) - # Step 3: Incident correlation (always runs — unclassified calls can still link via talkgroup) - incident_id = await incident_correlator.correlate_call( - call_id=call_id, - node_id=node_id, - system_id=system_id, - talkgroup_id=talkgroup_id, - talkgroup_name=talkgroup_name, - tags=tags, - incident_type=incident_type, - location=location, - location_coords=location_coords, - ) + # Step 3: Correlate each scene independently. + # A single recording can produce multiple incidents on a busy channel. + incident_ids: list[str] = [] + all_tags: list[str] = [] + for scene in scenes: + all_tags.extend(scene["tags"]) + incident_id = await incident_correlator.correlate_call( + call_id=call_id, + node_id=node_id, + system_id=system_id, + talkgroup_id=talkgroup_id, + talkgroup_name=talkgroup_name, + tags=scene["tags"], + incident_type=scene["incident_type"], + location=scene["location"], + location_coords=scene["location_coords"], + ) + if incident_id and incident_id not in incident_ids: + incident_ids.append(incident_id) + if scene["resolved"] and incident_id: + await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) + logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") - if resolved and incident_id: - await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) - logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") + # Correlator also runs for calls with no scenes (unclassified) to attempt + # talkgroup-based linking even when no transcript could be produced. + if not scenes: + incident_id = await incident_correlator.correlate_call( + call_id=call_id, + node_id=node_id, + system_id=system_id, + talkgroup_id=talkgroup_id, + talkgroup_name=talkgroup_name, + tags=[], + incident_type=None, + location=None, + location_coords=None, + ) + if incident_id: + incident_ids.append(incident_id) + + if incident_ids: + await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids}) # Step 4: Alert dispatch (always runs — talkgroup ID rules don't need a transcript) await alerter.check_and_dispatch( @@ -191,6 +225,6 @@ async def _run_intelligence_pipeline( node_id=node_id, talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, - tags=tags, + tags=list(dict.fromkeys(all_tags)), transcript=transcript, ) diff --git a/drb-frontend/app/incidents/page.tsx b/drb-frontend/app/incidents/page.tsx index 686d5d7..27972a3 100644 --- a/drb-frontend/app/incidents/page.tsx +++ b/drb-frontend/app/incidents/page.tsx @@ -15,6 +15,22 @@ const TYPE_COLORS: Record = { other: "bg-gray-800 text-gray-300", }; +const SEVERITY_COLORS: Record = { + major: "bg-red-950 text-red-400", + moderate: "bg-orange-950 text-orange-400", + minor: "bg-gray-800 text-gray-400", +}; + +function severityBadge(severity: string | null | undefined) { + if (!severity || severity === "unknown") return null; + const cls = SEVERITY_COLORS[severity] ?? "bg-gray-800 text-gray-400"; + return ( + + {severity} + + ); +} + function typeBadge(type: string | null) { const cls = TYPE_COLORS[type ?? "other"] ?? TYPE_COLORS.other; return ( @@ -51,6 +67,7 @@ function IncidentRow({ incident, isAdmin, onResolve }: { {incident.status} + {severityBadge(incident.severity)} {incident.call_ids.length} {fmtTime(incident.started_at)} {fmtTime(incident.updated_at)} @@ -167,9 +184,12 @@ function IncidentCards({ incidents, isAdmin, onResolve }: { )}

{inc.title ?? "—"}

-

- {fmtTime(inc.started_at)} · {inc.call_ids.length} call{inc.call_ids.length !== 1 ? "s" : ""} -

+
+ {severityBadge(inc.severity)} +

+ {fmtTime(inc.started_at)} · {inc.call_ids.length} call{inc.call_ids.length !== 1 ? "s" : ""} +

+
))} @@ -196,6 +216,7 @@ function IncidentTable({ incidents, isAdmin, onResolve }: { Type Title Status + Severity Calls Started Updated diff --git a/drb-frontend/components/CallRow.tsx b/drb-frontend/components/CallRow.tsx index c0bad11..de1f232 100644 --- a/drb-frontend/components/CallRow.tsx +++ b/drb-frontend/components/CallRow.tsx @@ -31,8 +31,13 @@ export function CallRow({ call, systemName, isAdmin }: Props) { const [editText, setEditText] = useState(""); const [saving, setSaving] = useState(false); const [saveError, setSaveError] = useState(null); + // Resolve incident links: prefer new list, fall back to legacy single field. + const incidentIds: string[] = (call.incident_ids?.length ?? 0) > 0 + ? call.incident_ids + : call.incident_id ? [call.incident_id] : []; + const isActive = call.status === "active"; - const hasDetails = call.transcript || call.transcript_corrected || (call.tags && call.tags.length > 0) || call.incident_id || call.audio_url; + const hasDetails = call.transcript || call.transcript_corrected || (call.tags && call.tags.length > 0) || incidentIds.length > 0 || call.audio_url; const displayTranscript = (!showOriginal && call.transcript_corrected) ? call.transcript_corrected : call.transcript; const hasBoth = !!(call.transcript && call.transcript_corrected); const hasSegments = call.segments && call.segments.length > 1; @@ -121,14 +126,16 @@ export function CallRow({ call, systemName, isAdmin }: Props) { )} - {/* Incident link */} - {call.incident_id && ( -

- Incident:{" "} - - {call.incident_id.slice(0, 8)}… - -

+ {/* Incident links — one per scene detected in the recording */} + {incidentIds.length > 0 && ( +
+ {incidentIds.length === 1 ? "Incident:" : "Incidents:"} + {incidentIds.map((id) => ( + + {id.slice(0, 8)}… + + ))} +
)} {/* Transcript */} diff --git a/drb-frontend/lib/c2api.ts b/drb-frontend/lib/c2api.ts index 3bcef8c..0456758 100644 --- a/drb-frontend/lib/c2api.ts +++ b/drb-frontend/lib/c2api.ts @@ -94,6 +94,12 @@ export const c2api = { reissueNodeKey: (nodeId: string) => request(`/nodes/${nodeId}/reissue-key`, { method: "POST" }), + // Ten-codes + getTenCodes: (systemId: string) => + request<{ ten_codes: Record }>(`/systems/${systemId}/ten-codes`), + updateTenCodes: (systemId: string, ten_codes: Record) => + request(`/systems/${systemId}/ten-codes`, { method: "PUT", body: JSON.stringify({ ten_codes }) }), + // Vocabulary getVocabulary: (systemId: string) => request<{ vocabulary: string[]; vocabulary_pending: { term: string; source: "induction" | "correction"; added_at: string }[]; vocabulary_bootstrapped: boolean }>( diff --git a/drb-frontend/lib/types.ts b/drb-frontend/lib/types.ts index 388f511..694e0a2 100644 --- a/drb-frontend/lib/types.ts +++ b/drb-frontend/lib/types.ts @@ -27,6 +27,7 @@ export interface SystemRecord { vocabulary?: string[]; vocabulary_pending?: VocabularyPendingTerm[]; vocabulary_bootstrapped?: boolean; + ten_codes?: Record; // {"10-10": "Commercial Alarm", ...} } export interface TranscriptSegment { @@ -48,7 +49,10 @@ export interface CallRecord { transcript: string | null; transcript_corrected: string | null; segments: TranscriptSegment[] | null; - incident_id: string | null; + /** New: one entry per scene detected in the recording. */ + incident_ids: string[]; + /** Legacy field — present on calls recorded before the multi-scene migration. */ + incident_id?: string | null; location: string | null; tags: string[]; status: "active" | "ended";