From 7b6fd640d9d91718add0c2b7d70ece92ba3aeca6 Mon Sep 17 00:00:00 2001 From: Logan Date: Sun, 12 Apr 2026 23:33:44 -0400 Subject: [PATCH] Update intelligence --- drb-c2-core/.env.example | 6 + drb-c2-core/app/config.py | 8 +- .../app/internal/incident_correlator.py | 271 ++++++++++++++---- drb-c2-core/app/internal/intelligence.py | 188 +++++++----- drb-c2-core/app/internal/summarizer.py | 114 ++++++++ drb-c2-core/app/main.py | 3 + drb-c2-core/app/routers/upload.py | 5 +- drb-c2-core/requirements.txt | 2 + 8 files changed, 456 insertions(+), 141 deletions(-) create mode 100644 drb-c2-core/app/internal/summarizer.py diff --git a/drb-c2-core/.env.example b/drb-c2-core/.env.example index edab2f2..bcd713d 100644 --- a/drb-c2-core/.env.example +++ b/drb-c2-core/.env.example @@ -21,6 +21,12 @@ NODE_OFFLINE_THRESHOLD=90 # OpenAI Whisper — for audio transcription OPENAI_API_KEY= +# Gemini — for intelligence extraction, embeddings, and incident summaries +GEMINI_API_KEY= +SUMMARY_INTERVAL_MINUTES=15 +CORRELATION_WINDOW_HOURS=4 +EMBEDDING_SIMILARITY_THRESHOLD=0.82 + # Auth — static key that edge nodes send as Bearer token on /upload # Generate with: openssl rand -hex 32 NODE_API_KEY= diff --git a/drb-c2-core/app/config.py b/drb-c2-core/app/config.py index 61ee209..018632d 100644 --- a/drb-c2-core/app/config.py +++ b/drb-c2-core/app/config.py @@ -17,9 +17,15 @@ class Settings(BaseSettings): # Node health node_offline_threshold: int = 90 # seconds without checkin before marking offline - # OpenAI + # OpenAI (Whisper STT) openai_api_key: Optional[str] = None + # Gemini (intelligence extraction, embeddings, incident summaries) + gemini_api_key: Optional[str] = None + summary_interval_minutes: int = 15 # how often the summary loop runs + correlation_window_hours: int = 4 # how far back to look for matching incidents + embedding_similarity_threshold: float = 0.82 # cosine similarity cutoff for slow-path match + # Internal service key — allows server-side services (discord bot) to call C2 without Firebase service_key: Optional[str] = None diff --git a/drb-c2-core/app/internal/incident_correlator.py b/drb-c2-core/app/internal/incident_correlator.py index 4748fa6..8df1670 100644 --- a/drb-c2-core/app/internal/incident_correlator.py +++ b/drb-c2-core/app/internal/incident_correlator.py @@ -1,109 +1,256 @@ """ -Incident correlation engine. +Hybrid incident correlation engine. -After a call is transcribed and tagged, this module attempts to link it to an -existing open incident (same type, same node/system, within a 30-minute -window). If no match is found, a new incident is auto-created. +Fast path — deterministic: + Same system_id + talkgroup_id + incident_type within the correlation window. + This is the primary signal for P25 — dispatch assigns one talkgroup per incident. -The result is written back to Firestore on both the call document -(call.incident_id) and the incident document (incident.call_ids). +Slow path — embedding similarity: + If fast path finds nothing, compare the new call's embedding against the + centroid embedding of each open incident (running average of its call embeddings). + Match if cosine similarity >= threshold. + +Background re-evaluation (driven by summarizer loop): + Unmatched calls are re-checked periodically, catching mutual-aid cases where + a second talkgroup gets pulled into an existing incident. + +Incident document schema additions: + talkgroup_ids: list[str] — all talkgroups that have contributed calls + location_mentions: list[str] — all location strings from calls (newest last) + location: str|None — best known location (newest non-null for now; + TODO: replace with Maps geocoding bbox comparison) + vehicles: list[str] — deduplicated vehicle list across all calls + units: list[str] — deduplicated unit list across all calls + severity: str — highest severity seen + summary_stale: bool — True when a new call is added + summary_last_run: str|None — ISO timestamp of last Gemini summary run + embedding: list[float] — running-average centroid of call embeddings + embedding_count: int — number of calls factored into the centroid """ import uuid from datetime import datetime, timezone, timedelta from typing import Optional from app.internal.logger import logger from app.internal import firestore as fstore - - -_CORRELATION_WINDOW = timedelta(minutes=30) +from app.config import settings async def correlate_call( call_id: str, node_id: str, system_id: Optional[str], + talkgroup_id: Optional[int], talkgroup_name: Optional[str], tags: list[str], incident_type: Optional[str], + location: Optional[str] = None, ) -> Optional[str]: """ Link call_id to an existing incident or create a new one. - Args: - call_id: ID of the call being processed. - node_id: Edge node that recorded the call. - system_id: Radio system ID (may be None). - talkgroup_name: Human-readable talkgroup name for auto-title generation. - tags: Tags extracted by intelligence.py. - incident_type: Primary incident category (fire/police/ems/accident) or None. - - Returns: - The incident_id that was linked, or None if skipped. + Returns the incident_id that was linked, or None if skipped. """ if not incident_type: return None now = datetime.now(timezone.utc) - cutoff = (now - _CORRELATION_WINDOW).isoformat() + window = timedelta(hours=settings.correlation_window_hours) - # Fetch active incidents of the same type + # Fetch all active incidents of the same type candidates = await fstore.collection_list("incidents", status="active", type=incident_type) - # Filter to incidents updated within the correlation window and on this node - matched_incident: Optional[dict] = None + # Filter to those within the correlation window + active = [] for inc in candidates: updated_raw = inc.get("updated_at", "") try: updated_dt = datetime.fromisoformat(str(updated_raw).replace("Z", "+00:00")) if updated_dt.tzinfo is None: updated_dt = updated_dt.replace(tzinfo=timezone.utc) + if (now - updated_dt) <= window: + active.append(inc) except Exception: continue - if updated_dt < (now - _CORRELATION_WINDOW): - continue + # ---------------------------------------------------------------- + # Fast path — talkgroup match + # ---------------------------------------------------------------- + matched_incident: Optional[dict] = None + if talkgroup_id is not None and system_id: + tg_str = str(talkgroup_id) + for inc in active: + if system_id in (inc.get("system_ids") or []) and tg_str in (inc.get("talkgroup_ids") or []): + matched_incident = inc + logger.info(f"Correlator fast-path: call {call_id} → incident {inc['incident_id']} (tg match)") + break - # Check whether any call in this incident came from the same node - linked_call_ids = inc.get("call_ids", []) - if linked_call_ids: - for linked_id in linked_call_ids[:5]: # check last 5 calls to avoid slow queries - linked_call = await fstore.doc_get("calls", linked_id) - if linked_call and linked_call.get("node_id") == node_id: - matched_incident = inc - break - if matched_incident: - break + # ---------------------------------------------------------------- + # Slow path — embedding similarity + # ---------------------------------------------------------------- + if not matched_incident and active: + call_doc = await fstore.doc_get("calls", call_id) + call_embedding = call_doc.get("embedding") if call_doc else None + if call_embedding: + best_score = 0.0 + best_inc = None + for inc in active: + inc_embedding = inc.get("embedding") + if not inc_embedding: + continue + score = _cosine_similarity(call_embedding, inc_embedding) + if score > best_score: + best_score = score + best_inc = inc + if best_inc and best_score >= settings.embedding_similarity_threshold: + matched_incident = best_inc + logger.info( + f"Correlator slow-path: call {call_id} → incident {best_inc['incident_id']} " + f"(similarity={best_score:.3f})" + ) + # ---------------------------------------------------------------- + # Update existing or create new + # ---------------------------------------------------------------- if matched_incident: incident_id = matched_incident["incident_id"] - existing_ids = matched_incident.get("call_ids", []) - if call_id not in existing_ids: - existing_ids.append(call_id) - await fstore.doc_update("incidents", incident_id, { - "call_ids": existing_ids, - "updated_at": now.isoformat(), - }) - logger.info(f"Correlator: linked call {call_id} to existing incident {incident_id}") + await _update_incident(matched_incident, call_id, talkgroup_id, system_id, tags, location, now) else: - # Create a new incident - incident_id = str(uuid.uuid4()) - tg_label = talkgroup_name or "Unknown Talkgroup" - title = f"Auto: {incident_type.title()} — {tg_label}" - doc = { - "incident_id": incident_id, - "title": title, - "type": incident_type, - "status": "active", - "location": None, - "call_ids": [call_id], - "summary": None, - "tags": tags, - "started_at": now.isoformat(), - "updated_at": now.isoformat(), - } - await fstore.doc_set("incidents", incident_id, doc, merge=False) - logger.info(f"Correlator: created new incident {incident_id} for call {call_id} ({incident_type})") + incident_id = await _create_incident( + call_id, incident_type, talkgroup_id, talkgroup_name, system_id, tags, location, now + ) # Back-link the call - await fstore.doc_update("calls", call_id, {"incident_id": incident_id}) + await fstore.doc_set("calls", call_id, {"incident_id": incident_id}) return incident_id + + +async def _update_incident( + inc: dict, + call_id: str, + talkgroup_id: Optional[int], + system_id: Optional[str], + tags: list[str], + location: Optional[str], + now: datetime, +) -> None: + incident_id = inc["incident_id"] + + call_ids = inc.get("call_ids", []) + if call_id not in call_ids: + call_ids.append(call_id) + + talkgroup_ids = inc.get("talkgroup_ids", []) + if talkgroup_id is not None and str(talkgroup_id) not in talkgroup_ids: + talkgroup_ids.append(str(talkgroup_id)) + + system_ids = inc.get("system_ids", []) + if system_id and system_id not in system_ids: + system_ids.append(system_id) + + # Merge tags (deduplicated) + merged_tags = list(dict.fromkeys(inc.get("tags", []) + tags)) + + # Location — append to mentions; update display location if new one is non-null + # TODO: replace "newest wins" with Maps geocoding bbox comparison for true specificity + location_mentions = inc.get("location_mentions", []) + if location and location not in location_mentions: + location_mentions.append(location) + best_location = location if location else inc.get("location") + + # Update centroid embedding + embedding_updates = await _merge_embedding(inc, call_id) + + updates = { + "call_ids": call_ids, + "talkgroup_ids": talkgroup_ids, + "system_ids": system_ids, + "tags": merged_tags, + "location_mentions": location_mentions, + "updated_at": now.isoformat(), + "summary_stale": True, + **embedding_updates, + } + if best_location: + updates["location"] = best_location + + await fstore.doc_set("incidents", incident_id, updates) + logger.info(f"Correlator: linked call {call_id} to existing incident {incident_id}") + + +async def _create_incident( + call_id: str, + incident_type: str, + talkgroup_id: Optional[int], + talkgroup_name: Optional[str], + system_id: Optional[str], + tags: list[str], + location: Optional[str], + now: datetime, +) -> str: + incident_id = str(uuid.uuid4()) + tg_label = talkgroup_name or "Unknown Talkgroup" + + call_doc = await fstore.doc_get("calls", call_id) + call_embedding = call_doc.get("embedding") if call_doc else None + call_vehicles = call_doc.get("vehicles", []) if call_doc else [] + call_units = call_doc.get("units", []) if call_doc else [] + call_severity = call_doc.get("severity", "unknown") if call_doc else "unknown" + + doc = { + "incident_id": incident_id, + "title": f"Auto: {incident_type.title()} — {tg_label}", + "type": incident_type, + "status": "active", + "location": location, + "location_mentions": [location] if location else [], + "call_ids": [call_id], + "talkgroup_ids": [str(talkgroup_id)] if talkgroup_id is not None else [], + "system_ids": [system_id] if system_id else [], + "tags": tags, + "vehicles": call_vehicles, + "units": call_units, + "severity": call_severity, + "summary": None, + "summary_stale": True, + "summary_last_run": None, + "embedding": call_embedding, + "embedding_count": 1 if call_embedding else 0, + "started_at": now.isoformat(), + "updated_at": now.isoformat(), + } + await fstore.doc_set("incidents", incident_id, doc, merge=False) + logger.info(f"Correlator: created incident {incident_id} for call {call_id} ({incident_type})") + return incident_id + + +async def _merge_embedding(inc: dict, call_id: str) -> dict: + """ + Update the incident's centroid embedding with the new call's embedding. + Uses an online running-average: new_avg = (old_avg * n + new_vec) / (n + 1) + """ + import numpy as np + + call_doc = await fstore.doc_get("calls", call_id) + call_embedding = call_doc.get("embedding") if call_doc else None + if not call_embedding: + return {} + + n = inc.get("embedding_count", 0) + old_embedding = inc.get("embedding") + + if old_embedding and n > 0: + old_vec = np.array(old_embedding, dtype=float) + new_vec = np.array(call_embedding, dtype=float) + updated = ((old_vec * n) + new_vec) / (n + 1) + return {"embedding": updated.tolist(), "embedding_count": n + 1} + else: + return {"embedding": call_embedding, "embedding_count": 1} + + +def _cosine_similarity(a: list[float], b: list[float]) -> float: + import numpy as np + va, vb = np.array(a, dtype=float), np.array(b, dtype=float) + norm_a, norm_b = np.linalg.norm(va), np.linalg.norm(vb) + if norm_a == 0 or norm_b == 0: + return 0.0 + return float(np.dot(va, vb) / (norm_a * norm_b)) diff --git a/drb-c2-core/app/internal/intelligence.py b/drb-c2-core/app/internal/intelligence.py index 1510e3b..e3966e9 100644 --- a/drb-c2-core/app/internal/intelligence.py +++ b/drb-c2-core/app/internal/intelligence.py @@ -1,106 +1,140 @@ """ -Rules-based intelligence extraction from call transcripts. +Gemini-powered intelligence extraction from call transcripts. -Scans a transcript for known incident keywords, categorises the call, and -extracts rough location hints (street/intersection mentions). +Sends the transcript to Gemini Flash with a tight JSON schema prompt. +Returns structured data: incident type, tags, location, vehicles, units, severity. -No external ML dependencies — fast and always available even when STT is -disabled. Designed to run as part of the post-upload background pipeline. +Falls back gracefully if Gemini is unavailable or returns malformed output. """ -import re +import asyncio +import json from typing import Optional from app.internal.logger import logger from app.internal import firestore as fstore +_PROMPT_TEMPLATE = """You are analyzing a P25 public safety radio transcript. Extract structured information and respond ONLY with a single valid JSON object — no markdown, no explanation. -# --------------------------------------------------------------------------- -# Keyword taxonomy -# --------------------------------------------------------------------------- +Schema: +{{ + "incident_type": one of "fire" | "ems" | "police" | "accident" | "other" | "unknown", + "tags": [list of specific descriptive tags, max 6, e.g. "two-car mva", "property-damage-only", "working fire", "shots-fired"], + "location": "most specific location string found, or empty string", + "vehicles": [vehicle descriptions mentioned, e.g. "Hyundai Tucson", "black sedan"], + "units": [unit IDs or officer numbers mentioned, e.g. "Unit 511", "Car 4"], + "severity": one of "minor" | "moderate" | "major" | "unknown" +}} -INCIDENT_KEYWORDS: dict[str, list[str]] = { - "fire": [ - "fire", "smoke", "flames", "burning", "structure fire", "brush fire", - "wildfire", "arson", "working fire", "fully involved", - ], - "ems": [ - "cardiac", "unconscious", "breathing", "overdose", "trauma", - "injury", "ambulance", "ems", "medic", "chest pain", "stroke", - "unresponsive", "fall", "laceration", - ], - "police": [ - "pursuit", "chase", "shots fired", "weapon", "suspect", "robbery", - "assault", "burglary", "stolen", "fleeing", "armed", "shooting", - "stabbing", "domestic", - ], - "accident": [ - "accident", "collision", "crash", "mvr", "vehicle", "rollover", - "hit and run", "ped", "pedestrian", "pi", "property damage", - ], -} +Rules: +- location: prefer intersections > addresses > mile markers > route+town > route alone > town alone. Empty string if none. +- tags: be specific and lowercase, hyphenated. Do not repeat incident_type as a tag. +- units: only identifiers explicitly mentioned, not inferred. +- Do not invent details not present in the transcript. -# Street suffix patterns for location extraction -_STREET_RE = re.compile( - r'\b(?:\d+\s+)?[A-Z][a-zA-Z]+(?: [A-Z][a-zA-Z]+)*' - r'\s+(?:Street|St|Avenue|Ave|Boulevard|Blvd|Drive|Dr|Road|Rd|Lane|Ln' - r'|Court|Ct|Place|Pl|Way|Circle|Cir|Highway|Hwy|Route|Rt)\b', - re.IGNORECASE, -) +Transcript: +{transcript}""" -# --------------------------------------------------------------------------- -# Public API -# --------------------------------------------------------------------------- - async def extract_tags( call_id: str, transcript: str, -) -> tuple[list[str], Optional[str]]: +) -> tuple[list[str], Optional[str], Optional[str]]: """ - Extract incident tags from a transcript. + Extract incident tags, type, and location from a transcript via Gemini. Returns: - (tags, primary_type) — e.g. (["fire", "structure fire"], "fire") - primary_type is the category with the most keyword hits, or None. + (tags, primary_type, location) - Side-effect: updates calls/{call_id}.tags in Firestore. + Side-effect: updates calls/{call_id} in Firestore with tags, location, + vehicles, units, severity; also stores the call embedding. """ - lower = transcript.lower() - matched: dict[str, list[str]] = {} + result = await asyncio.to_thread(_sync_extract, transcript) - for category, keywords in INCIDENT_KEYWORDS.items(): - hits = [kw for kw in keywords if kw in lower] - if hits: - matched[category] = hits + tags: list[str] = result.get("tags") or [] + incident_type: Optional[str] = result.get("incident_type") or None + location: Optional[str] = result.get("location") or None + vehicles: list[str] = result.get("vehicles") or [] + units: list[str] = result.get("units") or [] + severity: str = result.get("severity") or "unknown" - tags: list[str] = [] - for category, hits in matched.items(): - tags.append(category) - tags.extend(h for h in hits if h != category) + if incident_type in ("unknown", "other", ""): + incident_type = None - # Deduplicate while preserving order - seen: set[str] = set() - unique_tags: list[str] = [] - for t in tags: - if t not in seen: - seen.add(t) - unique_tags.append(t) + # Store embedding alongside structured data + embedding = await asyncio.to_thread(_sync_embed, _embed_text(transcript, incident_type)) - # Primary type = category with most keyword hits - primary_type: Optional[str] = None - if matched: - primary_type = max(matched, key=lambda c: len(matched[c])) + updates: dict = { + "tags": tags, + "severity": severity, + } + if location: + updates["location"] = location + if vehicles: + updates["vehicles"] = vehicles + if units: + updates["units"] = units + if embedding: + updates["embedding"] = embedding - if unique_tags: - try: - await fstore.doc_update("calls", call_id, {"tags": unique_tags}) - except Exception as e: - logger.warning(f"Could not save tags for call {call_id}: {e}") + try: + await fstore.doc_set("calls", call_id, updates) + except Exception as e: + logger.warning(f"Could not save intelligence for call {call_id}: {e}") - logger.info(f"Intelligence: call {call_id} → tags={unique_tags}, type={primary_type}") - return unique_tags, primary_type + logger.info( + f"Intelligence: call {call_id} → type={incident_type}, " + f"tags={tags}, location={location!r}, severity={severity}" + ) + return tags, incident_type, location -def extract_location_hint(transcript: str) -> Optional[str]: - """Return the first street-level location mention found in the transcript, or None.""" - match = _STREET_RE.search(transcript) - return match.group(0) if match else None +def _sync_extract(transcript: str) -> dict: + """Call Gemini Flash and parse the JSON response.""" + from app.config import settings + import google.generativeai as genai + + if not settings.gemini_api_key: + logger.warning("GEMINI_API_KEY not set — intelligence extraction disabled.") + return {} + + genai.configure(api_key=settings.gemini_api_key) + model = genai.GenerativeModel( + "gemini-1.5-flash", + generation_config={"response_mime_type": "application/json"}, + ) + + try: + response = model.generate_content(_PROMPT_TEMPLATE.format(transcript=transcript)) + return json.loads(response.text) + except json.JSONDecodeError as e: + logger.warning(f"Gemini returned non-JSON: {e}") + return {} + except Exception as e: + logger.warning(f"Gemini extraction failed: {e}") + return {} + + +def _sync_embed(text: str) -> Optional[list[float]]: + """Generate a text-embedding-004 vector for semantic similarity.""" + from app.config import settings + import google.generativeai as genai + + if not settings.gemini_api_key: + return None + + genai.configure(api_key=settings.gemini_api_key) + try: + result = genai.embed_content( + model="models/text-embedding-004", + content=text, + task_type="SEMANTIC_SIMILARITY", + ) + return result["embedding"] + except Exception as e: + logger.warning(f"Embedding generation failed: {e}") + return None + + +def _embed_text(transcript: str, incident_type: Optional[str]) -> str: + """Build the text string to embed — transcript + type context.""" + prefix = f"[{incident_type}] " if incident_type else "" + return f"{prefix}{transcript}" diff --git a/drb-c2-core/app/internal/summarizer.py b/drb-c2-core/app/internal/summarizer.py new file mode 100644 index 0000000..21e92e1 --- /dev/null +++ b/drb-c2-core/app/internal/summarizer.py @@ -0,0 +1,114 @@ +""" +Background incident summary loop. + +Runs every SUMMARY_INTERVAL_MINUTES. Finds all active incidents with +summary_stale=True, fetches all their call transcripts, and calls Gemini +once per incident to produce a concise factual summary. + +By batching this way: Gemini is never called per-call — only periodically +and only for incidents that have actually changed since the last run. +""" +import asyncio +from datetime import datetime, timezone +from typing import Optional +from app.internal.logger import logger +from app.internal import firestore as fstore +from app.config import settings + + +async def summarizer_loop() -> None: + interval = settings.summary_interval_minutes * 60 + logger.info(f"Summarizer started — interval: {settings.summary_interval_minutes}m") + while True: + await asyncio.sleep(interval) + try: + await _run_summary_pass() + except Exception as e: + logger.error(f"Summarizer pass failed: {e}") + + +async def _run_summary_pass() -> None: + stale = await fstore.collection_list("incidents", status="active", summary_stale=True) + if not stale: + return + + logger.info(f"Summarizer: processing {len(stale)} stale incident(s)") + for inc in stale: + await _summarize_incident(inc) + + +async def _summarize_incident(inc: dict) -> None: + incident_id = inc.get("incident_id") + if not incident_id: + return + + call_ids: list[str] = inc.get("call_ids", []) + if not call_ids: + return + + # Fetch transcripts for all calls in this incident + transcripts: list[str] = [] + for cid in call_ids: + doc = await fstore.doc_get("calls", cid) + if doc and doc.get("transcript"): + transcripts.append(doc["transcript"]) + + if not transcripts: + # No transcripts yet — clear stale flag and wait for next pass + await fstore.doc_set("incidents", incident_id, {"summary_stale": False}) + return + + summary = await asyncio.to_thread(_sync_summarize, inc, transcripts) + + now = datetime.now(timezone.utc).isoformat() + updates: dict = { + "summary_stale": False, + "summary_last_run": now, + } + if summary: + updates["summary"] = summary + logger.info(f"Summarizer: updated summary for incident {incident_id}") + else: + logger.warning(f"Summarizer: Gemini returned nothing for incident {incident_id}") + + await fstore.doc_set("incidents", incident_id, updates) + + +def _sync_summarize(inc: dict, transcripts: list[str]) -> Optional[str]: + from app.config import settings + import google.generativeai as genai + + if not settings.gemini_api_key: + return None + + genai.configure(api_key=settings.gemini_api_key) + model = genai.GenerativeModel("gemini-1.5-flash") + + inc_type = inc.get("type", "unknown") + location = inc.get("location") or "unknown location" + tg_ids = ", ".join(inc.get("talkgroup_ids", [])) or "unknown" + numbered = "\n".join(f"{i+1}. {t}" for i, t in enumerate(transcripts)) + + prompt = f"""You are analyzing P25 public safety radio communications for a single active incident. + +Incident type: {inc_type} +Location: {location} +Talkgroup(s): {tg_ids} + +Transcripts ({len(transcripts)} calls, chronological): +{numbered} + +Write a concise factual summary of this incident in 2-4 sentences. Include: +- What happened +- Location (most specific mentioned) +- Units or resources involved if mentioned +- Current status if determinable + +Be factual. Do not speculate beyond what the transcripts say. Do not use bullet points.""" + + try: + response = model.generate_content(prompt) + return response.text.strip() or None + except Exception as e: + logger.warning(f"Gemini summary failed: {e}") + return None diff --git a/drb-c2-core/app/main.py b/drb-c2-core/app/main.py index 55d0489..e0e122f 100644 --- a/drb-c2-core/app/main.py +++ b/drb-c2-core/app/main.py @@ -5,6 +5,7 @@ from fastapi.middleware.cors import CORSMiddleware from app.internal.logger import logger from app.internal.mqtt_handler import mqtt_handler from app.internal.node_sweeper import sweeper_loop +from app.internal.summarizer import summarizer_loop from app.config import settings from app.internal.auth import require_firebase_token, require_service_or_firebase_token from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts @@ -35,11 +36,13 @@ async def lifespan(app: FastAPI): await mqtt_handler.connect() sweeper_task = asyncio.create_task(sweeper_loop()) + summarizer_task = asyncio.create_task(summarizer_loop()) yield # --- app running --- logger.info("DRB C2 Core shutting down.") sweeper_task.cancel() + summarizer_task.cancel() await mqtt_handler.disconnect() diff --git a/drb-c2-core/app/routers/upload.py b/drb-c2-core/app/routers/upload.py index a6436f2..7934f31 100644 --- a/drb-c2-core/app/routers/upload.py +++ b/drb-c2-core/app/routers/upload.py @@ -109,8 +109,9 @@ async def _run_intelligence_pipeline( # Step 2: Intelligence extraction tags: list[str] = [] incident_type: Optional[str] = None + location: Optional[str] = None if transcript: - tags, incident_type = await intelligence.extract_tags(call_id, transcript) + tags, incident_type, location = await intelligence.extract_tags(call_id, transcript) # Step 3: Incident correlation if incident_type: @@ -118,9 +119,11 @@ async def _run_intelligence_pipeline( call_id=call_id, node_id=node_id, system_id=system_id, + talkgroup_id=talkgroup_id, talkgroup_name=talkgroup_name, tags=tags, incident_type=incident_type, + location=location, ) # Step 4: Alert dispatch (always runs — talkgroup ID rules don't need a transcript) diff --git a/drb-c2-core/requirements.txt b/drb-c2-core/requirements.txt index 40c7d5b..2e50f31 100644 --- a/drb-c2-core/requirements.txt +++ b/drb-c2-core/requirements.txt @@ -5,6 +5,8 @@ paho-mqtt>=2.0.0 firebase-admin google-cloud-storage openai +google-generativeai +numpy httpx python-multipart pytest