diff --git a/drb-c2-core/app/config.py b/drb-c2-core/app/config.py index aacc68e..922576d 100644 --- a/drb-c2-core/app/config.py +++ b/drb-c2-core/app/config.py @@ -30,7 +30,7 @@ class Settings(BaseSettings): location_proximity_km: float = 0.5 # radius for location-proximity matching incident_auto_resolve_minutes: int = 90 # auto-resolve after N minutes with no new calls recorrelation_scan_minutes: int = 60 # re-examine orphaned calls ended within this window - tg_fast_path_idle_minutes: int = 30 # fast path: max minutes since incident last updated + tg_fast_path_idle_minutes: int = 90 # fast path: max minutes since incident last updated # Vocabulary learning vocabulary_induction_interval_hours: int = 24 # how often the induction loop runs diff --git a/drb-c2-core/app/internal/incident_correlator.py b/drb-c2-core/app/internal/incident_correlator.py index 7cc61d9..249e60c 100644 --- a/drb-c2-core/app/internal/incident_correlator.py +++ b/drb-c2-core/app/internal/incident_correlator.py @@ -48,6 +48,84 @@ from app.config import settings _DISPATCH_TG_RE = re.compile(r"\bdispatch\b|\bdisp\b", re.IGNORECASE) +# Matches route/road identifiers in location strings for cross-system parent detection. +# Groups: numbered routes (Route 202, NY-9, US-6, I-87, CR-35) and named parkways/highways. +_ROAD_RE = re.compile( + r"\b(?:route|rt\.?|rte\.?|us[-\s]?|state\s*route\s*|ny[-\s]?|i[-\s]?|cr[-\s]?|county\s*road\s*)\s*\d+\b" + r"|\b(?:tsp|taconic|thruway|parkway|turnpike|interstate)\b" + r"|\b\w+(?:\s+\w+)?\s+(?:street|avenue|road|drive|boulevard|lane|court|place|highway|pkwy|blvd|ave|rd|st|dr)\b", + re.IGNORECASE, +) + + +def _extract_road_ids(text: str) -> set[str]: + """ + Extract normalised road/route identifiers from a location string. + e.g. "suspect east on Route 202" → {"route 202"} + "at Main Street and Oak Ave" → {"main street", "oak ave"} + """ + return { + re.sub(r"[\s.\-]+", " ", m.group().lower()).strip() + for m in _ROAD_RE.finditer(text) + } + + +def _location_mentions_road_overlap(new_location: str, inc_mentions: list[str]) -> bool: + """True if the new call's location shares any road identifier with the incident's history.""" + if not new_location or not inc_mentions: + return False + new_roads = _extract_road_ids(new_location) + if not new_roads: + return False + inc_roads: set[str] = set() + for mention in inc_mentions: + inc_roads |= _extract_road_ids(mention) + return bool(new_roads & inc_roads) + + +def _operational_types_compatible(type_a: Optional[str], type_b: Optional[str]) -> bool: + """Police+police, ems+ems, fire+fire all match. Police+ems for co-response. Fire+anything for mutual aid.""" + if not type_a or not type_b: + return False + if type_a == type_b: + return True + compatible_pairs = {frozenset({"police", "ems"}), frozenset({"fire", "ems"}), frozenset({"fire", "police"})} + return frozenset({type_a, type_b}) in compatible_pairs + +# Tags that unambiguously imply a specific incident type, used as a fallback +# when GPT returns incident_type=None (typically due to missing talkgroup context). +# Only high-confidence, type-specific tags are listed — generic tags like +# "welfare-check" or "suspicious-activity" are omitted to avoid false typing. +_TAG_TYPE_HINTS: dict[str, str] = { + "active-fire": "fire", + "working-fire": "fire", + "structure-fire": "fire", + "brush-fire": "fire", + "smoke-investigation": "fire", + "fire-alarm": "fire", + "cardiac-arrest": "ems", + "unresponsive": "ems", + "medical-assistance": "ems", + "transport": "ems", + "courtesy-transport": "ems", + "mvc": "accident", + "mva": "accident", + "two-car-mva": "accident", + "traffic-stop": "police", + "shots-fired": "police", + "vehicle-pursuit": "police", + "pursuit": "police", +} + + +def _infer_type_from_tags(tags: list[str]) -> Optional[str]: + """Return an incident type inferred from tags, or None if ambiguous.""" + for tag in tags: + t = _TAG_TYPE_HINTS.get(tag.lower()) + if t: + return t + return None + def _tag_to_title(tag: str) -> str: """ @@ -94,6 +172,7 @@ async def correlate_call( create_if_new: bool = True, units: Optional[list[str]] = None, vehicles: Optional[list[str]] = None, + cleared_units: Optional[list[str]] = None, ) -> Optional[str]: """ Link call_id to an existing incident or create a new one. @@ -123,6 +202,7 @@ async def correlate_call( # scene-level breakdown. call_units: list[str] = units if units is not None else (call_doc.get("units") or []) call_vehicles: list[str] = vehicles if vehicles is not None else (call_doc.get("vehicles") or []) + call_cleared: list[str] = cleared_units if cleared_units is not None else (call_doc.get("cleared_units") or []) call_severity: str = call_doc.get("severity") or "unknown" # Use passed coords first (freshly geocoded), fall back to what's on the call doc coords: Optional[dict] = location_coords or call_doc.get("location_coords") @@ -199,18 +279,67 @@ async def correlate_call( f"from {candidate['incident_id']}; will attempt new incident" ) elif len(tg_recent) > 1: - matched_incident = _disambiguate( + candidate = _disambiguate( tg_recent, call_units, call_vehicles, coords, call_embedding ) - corr_debug = { - "corr_path": "fast/disambig", - "corr_incident_idle_min": round(_incident_idle_minutes(matched_incident, now), 1), - "corr_candidates": len(tg_recent), - } - logger.info( - f"Correlator fast-path (disambig {len(tg_recent)} candidates): " - f"call {call_id} → {matched_incident['incident_id']}" + # Disambiguate picks the best candidate, but still verify the call + # actually fits before committing — a new unrelated call on a busy + # dispatch channel should create its own incident, not be force-merged. + if _call_fits_incident( + candidate, call_units, call_vehicles, coords, + settings.location_proximity_km, is_dispatch=is_dispatch, + ): + matched_incident = candidate + corr_debug = { + "corr_path": "fast/disambig", + "corr_incident_idle_min": round(_incident_idle_minutes(candidate, now), 1), + "corr_candidates": len(tg_recent), + } + logger.info( + f"Correlator fast-path (disambig {len(tg_recent)} candidates): " + f"call {call_id} → {candidate['incident_id']}" + ) + else: + logger.info( + f"Correlator fast-path disambig: no candidate fits call {call_id} " + f"across {len(tg_recent)} incidents — will attempt new incident" + ) + + # ── 1.5. Unit-continuity path: same officer, not reassigned ───────────────── + # + # Handles long calls (bookings, transports, late scene clearance) where the + # 90-min idle gate has fired but the officer is still on the same call. + # Searches ALL active incidents — no idle gate, no time limit. + # + # Reassignment guard: if the same unit appears in a MORE recently updated + # incident, the officer has moved on and we don't link back to the old one. + # This correctly handles officers dispatched to a second call mid-shift. + if not matched_incident and call_units and system_id: + call_unit_set = set(call_units) + unit_candidates = [ + inc for inc in all_active + if system_id in (inc.get("system_ids") or []) + and call_unit_set & set(inc.get("units") or []) + ] + if unit_candidates: + best_unit_inc = max(unit_candidates, key=lambda i: i.get("updated_at", "")) + reassigned_away = any( + inc["incident_id"] != best_unit_inc["incident_id"] + and call_unit_set & set(inc.get("units") or []) + and inc.get("updated_at", "") > best_unit_inc.get("updated_at", "") + for inc in all_active ) + if not reassigned_away: + matched_incident = best_unit_inc + corr_debug = { + "corr_path": "unit-continuity", + "corr_incident_idle_min": round(_incident_idle_minutes(best_unit_inc, now), 1), + } + logger.info( + f"Correlator unit-continuity: call {call_id} → " + f"{best_unit_inc['incident_id']} " + f"(idle {_incident_idle_minutes(best_unit_inc, now):.0f}min)" + ) # ── 2. Location path: proximity match (time-limited, cross-type) ───────── if not matched_incident and coords: @@ -331,16 +460,87 @@ async def correlate_call( matched_incident, call_id, talkgroup_id, system_id, tags, location, location_coords, call_units, call_vehicles, call_embedding, now, talkgroup_name=talkgroup_name, incident_type=incident_type, + cleared_units=call_cleared, ) - elif incident_type and create_if_new: - incident_id = await _create_incident( - call_id, incident_type, talkgroup_id, talkgroup_name, system_id, - tags, location, location_coords, - call_units, call_vehicles, call_embedding, call_severity, now, - ) - corr_debug["corr_path"] = "new" + elif create_if_new: + # If GPT returned no type (missing talkgroup context is common), attempt + # to recover a type from the extracted tags before giving up on creation. + if not incident_type and tags: + incident_type = _infer_type_from_tags(tags) + if incident_type: + logger.info( + f"Correlator: inferred incident_type={incident_type!r} from tags " + f"{tags} for call {call_id} (no GPT type)" + ) + if not incident_type: + # No type and none inferred — nothing to create + return None + + # ── Cross-system parent detection ───────────────────────────────────── + # Before creating a standalone incident, check whether this call belongs + # to an incident already opened by a different agency (multi-agency chase, + # mutual aid, etc.). If a parent candidate is found: + # • The existing candidate is demoted to a child (incident_type → "child") + # • A new master shell is created linking both children + # • The new call's incident is created as a second child of the master + cross_parent: Optional[dict] = None + if system_id: + cross_parent = await _find_cross_system_parent( + system_id=system_id, + incident_type=incident_type, + location=location, + location_coords=coords, + call_embedding=call_embedding, + recent=recent, + ) + + if cross_parent: + existing_child_id = cross_parent["incident_id"] + existing_master_id = cross_parent.get("parent_incident_id") + + # Create the new agency's child incident first + incident_id = await _create_incident( + call_id, incident_type, talkgroup_id, talkgroup_name, system_id, + tags, location, location_coords, + call_units, call_vehicles, call_embedding, call_severity, now, + ) + + if existing_master_id: + # Candidate is already a child — link new child to the existing master + await _demote_to_child(incident_id, existing_master_id) + await _add_child_to_master(existing_master_id, incident_id, now) + corr_debug["corr_path"] = "new/cross-system-child" + logger.info( + f"Correlator cross-system: call {call_id} → new child {incident_id} " + f"under existing master {existing_master_id}" + ) + else: + # Candidate is a standalone master — create master shell, demote both + master_id = await _create_master_incident( + first_child_id=existing_child_id, + second_child_id=incident_id, + operational_type=incident_type, + location=cross_parent.get("location") or location, + location_coords=cross_parent.get("location_coords") or coords, + now=now, + ) + await _demote_to_child(existing_child_id, master_id) + await _demote_to_child(incident_id, master_id) + corr_debug["corr_path"] = "new/cross-system-master" + logger.info( + f"Correlator cross-system: created master {master_id}, " + f"demoted {existing_child_id} + new {incident_id} as children" + ) + else: + # Normal single-agency incident creation + incident_id = await _create_incident( + call_id, incident_type, talkgroup_id, talkgroup_name, system_id, + tags, location, location_coords, + call_units, call_vehicles, call_embedding, call_severity, now, + ) + corr_debug["corr_path"] = "new" else: - # No match and either no type or creation suppressed — nothing to do + # Creation suppressed (re-correlation sweep) — nothing to do return None # Persist the correlation decision to the call document so it can be @@ -508,6 +708,7 @@ async def _update_incident( now: datetime, talkgroup_name: Optional[str] = None, incident_type: Optional[str] = None, + cleared_units: Optional[list[str]] = None, ) -> None: incident_id = inc["incident_id"] @@ -527,6 +728,19 @@ async def _update_incident( merged_units = list(dict.fromkeys((inc.get("units") or []) + call_units)) merged_vehicles = list(dict.fromkeys((inc.get("vehicles") or []) + call_vehicles)) + # Unit activity tracking: units_active / units_cleared + # units_active = units currently on scene; units_cleared = units back in service + units_active = list(inc.get("units_active") or []) + units_cleared = list(inc.get("units_cleared") or []) + for u in call_units: + if u not in units_cleared and u not in units_active: + units_active.append(u) + for u in (cleared_units or []): + if u in units_active: + units_active.remove(u) + if u not in units_cleared: + units_cleared.append(u) + location_mentions = list(inc.get("location_mentions") or []) if location and location not in location_mentions: location_mentions.append(location) @@ -543,6 +757,8 @@ async def _update_incident( "tags": merged_tags, "units": merged_units, "vehicles": merged_vehicles, + "units_active": units_active, + "units_cleared": units_cleared, "location_mentions": location_mentions, "updated_at": now.isoformat(), "summary_stale": True, @@ -574,6 +790,19 @@ async def _update_incident( elif primary_tag: updates["title"] = primary_tag + # Signal-based auto-resolve: every tracked unit has cleared, none still active. + # Requires at least one unit to have explicitly signalled back-in-service so we + # don't fire on incidents where units were never tracked (no unit mentions at all). + if units_cleared and not units_active: + updates["status"] = "resolved" + await fstore.doc_set("incidents", incident_id, updates) + logger.info( + f"Correlator: signal-resolved incident {incident_id} " + f"(call {call_id} — all {len(units_cleared)} unit(s) clear)" + ) + await maybe_resolve_parent(incident_id) + return + await fstore.doc_set("incidents", incident_id, updates) logger.info(f"Correlator: linked call {call_id} to incident {incident_id}") @@ -612,6 +841,7 @@ async def _create_incident( doc = { "incident_id": incident_id, "title": title, + "incident_type": "master", # structural role; "child" set on demotion "type": incident_type, "status": "active", "location": location, @@ -622,6 +852,8 @@ async def _create_incident( "system_ids": [system_id] if system_id else [], "tags": tags + ["auto-generated"], "units": call_units, + "units_active": list(call_units), + "units_cleared": [], "vehicles": call_vehicles, "severity": call_severity, "summary": None, @@ -652,6 +884,190 @@ def _merge_embedding_vecs(inc: dict, call_embedding: list[float]) -> dict: return {"embedding": call_embedding, "embedding_count": 1} +async def _create_master_incident( + first_child_id: str, + second_child_id: str, + operational_type: str, + location: Optional[str], + location_coords: Optional[dict], + now: datetime, +) -> str: + """ + Create a master shell incident linking two child incidents. + The master owns no calls directly — it is a grouping record. + Returns the new master incident_id. + """ + master_id = str(uuid.uuid4()) + doc = { + "incident_id": master_id, + "title": f"Multi-agency {operational_type} incident", + "incident_type": "master", + "type": operational_type, + "status": "active", + "location": location, + "location_coords": location_coords, + "child_incident_ids": [first_child_id, second_child_id], + "parent_incident_id": None, + "call_ids": [], + "talkgroup_ids": [], + "system_ids": [], + "tags": [], + "units": [], + "vehicles": [], + "severity": "unknown", + "summary": None, + "summary_stale": True, + "summary_last_run": None, + "embedding": None, + "embedding_count": 0, + "has_updates": False, + "started_at": now.isoformat(), + "updated_at": now.isoformat(), + } + await fstore.doc_set("incidents", master_id, doc, merge=False) + logger.info(f"Correlator: created master incident {master_id} linking {first_child_id} + {second_child_id}") + return master_id + + +async def _demote_to_child(incident_id: str, parent_id: str) -> None: + """Demote a standalone master incident to a child by setting incident_type and parent reference.""" + await fstore.doc_set("incidents", incident_id, { + "incident_type": "child", + "parent_incident_id": parent_id, + }) + logger.info(f"Correlator: demoted incident {incident_id} → child of master {parent_id}") + + +async def _add_child_to_master(master_id: str, child_id: str, now: datetime) -> None: + """Append a new child to an existing master's child_incident_ids list.""" + master = await fstore.doc_get("incidents", master_id) + if not master: + return + children = list(master.get("child_incident_ids") or []) + if child_id not in children: + children.append(child_id) + updates: dict = {"child_incident_ids": children, "updated_at": now.isoformat()} + # Re-open a resolved master when a new child is added (retroactive link) + if master.get("status") == "resolved": + updates["has_updates"] = True + await fstore.doc_set("incidents", master_id, updates) + + +async def maybe_resolve_parent(incident_id: str) -> None: + """ + Called after resolving a child incident. + If all siblings under the same master are also resolved, auto-resolve the master. + Safe to call on non-child incidents — exits immediately when there's no parent. + """ + inc = await fstore.doc_get("incidents", incident_id) + if not inc: + return + parent_id = inc.get("parent_incident_id") + if not parent_id: + return # standalone or already a master — nothing to propagate + + parent = await fstore.doc_get("incidents", parent_id) + if not parent or parent.get("status") == "resolved": + return # master already closed + + child_ids: list[str] = parent.get("child_incident_ids") or [] + if not child_ids: + return + + for cid in child_ids: + if cid == incident_id: + continue # the one we just resolved + child = await fstore.doc_get("incidents", cid) + if not child or child.get("status") != "resolved": + return # at least one sibling still active + + # All children resolved — close the master + await fstore.doc_set("incidents", parent_id, {"status": "resolved"}) + logger.info( + f"Auto-resolved master incident {parent_id} " + f"(all {len(child_ids)} child(ren) resolved)" + ) + + +async def _find_cross_system_parent( + system_id: str, + incident_type: Optional[str], + location: Optional[str], + location_coords: Optional[dict], + call_embedding: Optional[list], + recent: list[dict], +) -> Optional[dict]: + """ + Scan active incidents from OTHER systems for a cross-agency parent candidate. + + Match criteria (need at least two signals firing together): + A. Road/route identifier overlap between the new call's location and the + incident's accumulated location_mentions. Any shared route number or + road name is a strong positive — two agencies don't randomly share the + same road name in the same window. + B. Content embedding similarity ≥ 0.78 (lower than same-system slow path + because we're linking, not merging). + C. Geocoded proximity — 1 km for static scenes (≤2 location_mentions), + 3 km for dynamic/moving scenes (chase, expanding perimeter). + + Returns the best matching incident (master or standalone), or None. + Child incidents are resolved to their parent before matching so we always + attach to the master level. + """ + best_inc: Optional[dict] = None + best_score = 0.0 + + for inc in recent: + # Only cross-system candidates + if system_id in (inc.get("system_ids") or []): + continue + if not _operational_types_compatible(incident_type, inc.get("type")): + continue + + # If the candidate is already a child, resolve to its parent + if inc.get("incident_type") == "child" and inc.get("parent_incident_id"): + parent = await fstore.doc_get("incidents", inc["parent_incident_id"]) + if parent and parent.get("status") == "active": + inc = parent + else: + continue + + score = 0.0 + inc_mentions: list[str] = inc.get("location_mentions") or [] + + # Signal A — road/route identifier overlap (0.4). + # Shared route numbers are a strong signal but not conclusive alone. + if location and _location_mentions_road_overlap(location, inc_mentions): + score += 0.4 + + # Signal B — content embedding similarity ≥ 0.78 (0.3 flat bonus). + inc_embedding = inc.get("embedding") + if call_embedding and inc_embedding: + if _cosine_similarity(call_embedding, inc_embedding) >= 0.78: + score += 0.3 + + # Signal C — geocoded proximity (0.3). + # Dynamic scenes (3+ location mentions = chase/moving perimeter) use 3 km; + # static mutual aid (≤2 mentions) uses 1 km. + inc_coords = inc.get("location_coords") + if location_coords and inc_coords: + dist_km = _haversine_km( + location_coords["lat"], location_coords["lng"], + inc_coords["lat"], inc_coords["lng"], + ) + radius = 3.0 if len(inc_mentions) >= 3 else 1.0 + if dist_km <= radius: + score += 0.3 + + # threshold = 0.5 → requires at least two signals (A+B, A+C, or B+C). + # No single signal alone can clear the bar. + if score >= 0.5 and score > best_score: + best_score = score + best_inc = inc + + return best_inc + + def _cosine_similarity(a: list[float], b: list[float]) -> float: import numpy as np va, vb = np.array(a, dtype=float), np.array(b, dtype=float) diff --git a/drb-c2-core/app/internal/intelligence.py b/drb-c2-core/app/internal/intelligence.py index 6ab2ad1..251d67c 100644 --- a/drb-c2-core/app/internal/intelligence.py +++ b/drb-c2-core/app/internal/intelligence.py @@ -29,6 +29,7 @@ Response format — a JSON object with a "scenes" array. Each scene: location: most specific location string found, or empty string vehicles: list of vehicle descriptions mentioned units: list of unit IDs or officer numbers explicitly mentioned + cleared_units: list of unit IDs that explicitly signal back-in-service or available in this recording severity: one of "minor" | "moderate" | "major" | "unknown" resolved: true if this scene explicitly signals incident closure, false otherwise reassignment: true if dispatch is actively pulling a unit away from their current assignment to respond to a new, different call — e.g. "Baker, can you clear and respond to...", "Adam, break from that and go to...". False if the unit is simply reporting in, updating status, or continuing their current assignment. @@ -42,6 +43,7 @@ Rules: - incident_type: let the talkgroup channel be your primary signal. Use "fire" ONLY if the talkgroup is clearly a fire/rescue channel OR the transcript explicitly describes active fire, smoke, flames, or structure fire activation. Police or EMS referencing a fire scene → use "police" or "ems". When uncertain, prefer "other" over "fire". - ten_codes: interpret radio codes using the department reference provided below. Do not guess codes not listed. - resolved: true only when the scene explicitly signals "Code 4", "all clear", "10-42", "in custody", "patient transported", "fire out", "GOA", "negative contact", "scene clear". +- cleared_units: only include units that explicitly stated their own back-in-service status in this recording (e.g. "Unit 7, 10-8", "Baker-1 available", "E-14 back in service", or the department ten-code for available/back-in-service listed above). Silence or absence of a unit is NOT clearance. A scene-wide Code 4 belongs in resolved=true, not here — cleared_units is for individual unit availability signals only. - reassignment: only true when a unit is explicitly being pulled to a completely new call or location. A unit going en route to their first dispatch is NOT a reassignment. Routine status updates, acknowledgements, and scene updates are NOT reassignments. - transcript_corrected: fix only clear STT/vocoder errors (e.g. "Several" → "10-4", misheard street names, garbled unit IDs). Keep all radio language as-is — do NOT decode codes into plain English. Return null if accurate. @@ -129,6 +131,7 @@ async def extract_scenes( location: Optional[str] = scene.get("location") or None vehicles: list[str] = scene.get("vehicles") or [] units: list[str] = scene.get("units") or [] + cleared_units: list[str] = scene.get("cleared_units") or [] severity: str = scene.get("severity") or "unknown" resolved: bool = bool(scene.get("resolved", False)) reassignment: bool = bool(scene.get("reassignment", False)) @@ -160,6 +163,7 @@ async def extract_scenes( "location_coords": location_coords, "vehicles": vehicles, "units": units, + "cleared_units": cleared_units, "severity": severity, "resolved": resolved, "reassignment": reassignment, @@ -175,6 +179,7 @@ async def extract_scenes( all_tags = list(dict.fromkeys(t for s in processed for t in s["tags"])) all_units = list(dict.fromkeys(u for s in processed for u in s["units"])) all_vehicles = list(dict.fromkeys(v for s in processed for v in s["vehicles"])) + all_cleared = list(dict.fromkeys(u for s in processed for u in s["cleared_units"])) updates: dict = {"tags": all_tags, "severity": primary["severity"]} if primary["location"]: @@ -183,6 +188,8 @@ async def extract_scenes( updates["location_coords"] = primary["location_coords"] if all_units: updates["units"] = all_units + if all_cleared: + updates["cleared_units"] = all_cleared if all_vehicles: updates["vehicles"] = all_vehicles if primary["embedding"]: diff --git a/drb-c2-core/app/internal/mqtt_handler.py b/drb-c2-core/app/internal/mqtt_handler.py index 89053b3..ff5b8a8 100644 --- a/drb-c2-core/app/internal/mqtt_handler.py +++ b/drb-c2-core/app/internal/mqtt_handler.py @@ -109,10 +109,11 @@ class MQTTHandler: updates["status"] = "online" await fstore.doc_update("nodes", node_id, updates) - # Release any orphaned Discord token when the node explicitly reports disconnected - if payload.get("discord_connected") is False: - from app.routers.tokens import release_token - await release_token(node_id) + # NOTE: discord_connected in checkins is informational only — do NOT release the + # token here. The bot watchdog reconnects on transient Discord drops, so a single + # checkin with discord_connected=False during a brief reconnect window would + # incorrectly free the token while the bot is still active. Token release is + # handled exclusively by the discord_leave command and the node offline sweeper. # ------------------------------------------------------------------ # Status update diff --git a/drb-c2-core/app/internal/recorrelation_sweep.py b/drb-c2-core/app/internal/recorrelation_sweep.py index 6d345d7..521225e 100644 --- a/drb-c2-core/app/internal/recorrelation_sweep.py +++ b/drb-c2-core/app/internal/recorrelation_sweep.py @@ -46,9 +46,15 @@ async def _run_sweep_pass() -> None: ("status", "==", "ended"), ("ended_at", ">=", cutoff), ]) + # corr_path="unlinked" is written after MAX_SWEEP_ATTEMPTS failures. + # Allows a few retries so a welfare-check call can link to an escalation + # incident that is created a few minutes later, without sweeping 30× forever. + MAX_SWEEP_ATTEMPTS = 3 orphans = [ c for c in recent_ended if not c.get("incident_ids") and not c.get("incident_id") + and not c.get("corr_path") # skip calls already exhausted + and c.get("corr_sweep_count", 0) < MAX_SWEEP_ATTEMPTS ] if not orphans: @@ -87,6 +93,7 @@ async def _recorrelate_orphan(call: dict) -> bool: incident_type = call.get("incident_type"), location = call.get("location"), location_coords= call.get("location_coords"), + cleared_units = call.get("cleared_units") or [], reference_time = started_at, # anchor window to when the call happened create_if_new = False, # never create — link-only ) @@ -97,6 +104,15 @@ async def _recorrelate_orphan(call: dict) -> bool: f"Re-correlation: linked orphaned call {call_id} → incident {incident_id}" ) return True + + # Increment the attempt counter. Once MAX_SWEEP_ATTEMPTS is reached the + # orphan filter above will stop picking this call up, and we write + # corr_path="unlinked" as a permanent tombstone. + attempts = call.get("corr_sweep_count", 0) + 1 + update: dict = {"corr_sweep_count": attempts} + if attempts >= 3: + update["corr_path"] = "unlinked" + await fstore.doc_set("calls", call_id, update) return False diff --git a/drb-c2-core/app/internal/summarizer.py b/drb-c2-core/app/internal/summarizer.py index 8d2359b..58d6f7b 100644 --- a/drb-c2-core/app/internal/summarizer.py +++ b/drb-c2-core/app/internal/summarizer.py @@ -102,6 +102,8 @@ async def _resolve_stale_incidents() -> None: idle_minutes = (now - updated_dt).total_seconds() / 60 if idle_minutes > settings.incident_auto_resolve_minutes: await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) + from app.internal.incident_correlator import maybe_resolve_parent + await maybe_resolve_parent(incident_id) logger.info( f"Auto-resolved stale incident {incident_id} " f"(idle {idle_minutes:.0f}m)" diff --git a/drb-c2-core/app/routers/nodes.py b/drb-c2-core/app/routers/nodes.py index 8675ce8..cbacbe1 100644 --- a/drb-c2-core/app/routers/nodes.py +++ b/drb-c2-core/app/routers/nodes.py @@ -53,12 +53,24 @@ async def send_command(node_id: str, cmd: CommandPayload): payload = cmd.model_dump(exclude_none=True) if cmd.action == "discord_join": - preferred = payload.pop("preferred_token_id", None) + # Resolve system doc once — used for preferred token and presence name. + system_doc = None + system_id = node.get("assigned_system_id") + if system_id: + system_doc = await fstore.doc_get_cached("systems", system_id) + + # Explicit preferred_token_id in the request beats the system-level preference. + preferred = payload.pop("preferred_token_id", None) or (system_doc or {}).get("preferred_token_id") token = await assign_token(node_id, preferred_token_id=preferred) if not token: raise HTTPException(503, "No Discord bot tokens available in the pool.") payload["token"] = token + # Pass system name so the bot can set its Discord presence on join. + system_name = (system_doc or {}).get("name") + if system_name: + payload["system_name"] = system_name + elif cmd.action == "discord_leave": await release_token(node_id) diff --git a/drb-c2-core/app/routers/tokens.py b/drb-c2-core/app/routers/tokens.py index 4637f0c..1fea1b9 100644 --- a/drb-c2-core/app/routers/tokens.py +++ b/drb-c2-core/app/routers/tokens.py @@ -60,6 +60,34 @@ async def flush_tokens(): return {"released": len(results)} +@router.put("/{token_id}/prefer/{system_id}", status_code=200) +async def set_preferred_system(token_id: str, system_id: str): + """ + Mark this token as the preferred bot for a system. + When a discord_join is issued for any node in that system, this token + is tried first before falling back to the general pool. + Pass system_id="_none" to clear the preference. + """ + existing = await fstore.doc_get("bot_tokens", token_id) + if not existing: + raise HTTPException(404, "Token not found.") + if system_id == "_none": + # Clear any existing preference on the system that pointed to this token. + system_doc = await fstore.doc_get("systems", existing.get("preferred_for_system_id", "")) + if system_doc: + await fstore.doc_set("systems", existing["preferred_for_system_id"], {"preferred_token_id": None}) + await fstore.doc_set("bot_tokens", token_id, {"preferred_for_system_id": None}) + return {"ok": True, "preferred_for_system_id": None} + + system_doc = await fstore.doc_get("systems", system_id) + if not system_doc: + raise HTTPException(404, "System not found.") + # Set preference on both sides for easy lookup in either direction. + await fstore.doc_set("systems", system_id, {"preferred_token_id": token_id}) + await fstore.doc_set("bot_tokens", token_id, {"preferred_for_system_id": system_id}) + return {"ok": True, "preferred_for_system_id": system_id} + + @router.delete("/{token_id}", status_code=204) async def delete_token(token_id: str): existing = await fstore.doc_get("bot_tokens", token_id) diff --git a/drb-c2-core/app/routers/upload.py b/drb-c2-core/app/routers/upload.py index 0b13f34..83027da 100644 --- a/drb-c2-core/app/routers/upload.py +++ b/drb-c2-core/app/routers/upload.py @@ -125,11 +125,13 @@ async def _run_extraction_pipeline( location_coords=scene["location_coords"], units=corr_units, vehicles=scene.get("vehicles"), + cleared_units=scene.get("cleared_units"), ) if incident_id and incident_id not in incident_ids: incident_ids.append(incident_id) if scene["resolved"] and incident_id: await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) + await incident_correlator.maybe_resolve_parent(incident_id) logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") if incident_ids: @@ -224,11 +226,13 @@ async def _run_intelligence_pipeline( location_coords=scene["location_coords"], units=corr_units, vehicles=scene.get("vehicles"), + cleared_units=scene.get("cleared_units"), ) if incident_id and incident_id not in incident_ids: incident_ids.append(incident_id) if scene["resolved"] and incident_id: await fstore.doc_set("incidents", incident_id, {"status": "resolved"}) + await incident_correlator.maybe_resolve_parent(incident_id) logger.info(f"Auto-resolved incident {incident_id} (LLM closure detection)") # Correlator also runs for calls with no scenes (unclassified) to attempt