Files
server-26/drb-c2-core/app/internal/alerter.py
T
2026-04-11 13:44:08 -04:00

125 lines
3.9 KiB
Python

"""
Alert dispatch engine.
Loads enabled alert rules from Firestore and checks each one against the call's
talkgroup ID, tags, and transcript. On a match:
1. Creates an AlertEvent document in Firestore.
2. Optionally POSTs a Discord webhook message if the rule has one configured.
Never raises — failures are logged as warnings so the pipeline always completes.
"""
import uuid
from datetime import datetime, timezone
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
async def check_and_dispatch(
call_id: str,
node_id: str,
talkgroup_id: Optional[int],
talkgroup_name: Optional[str],
tags: list[str],
transcript: Optional[str],
) -> None:
"""
Check all enabled alert rules and fire events for any that match this call.
"""
try:
rules = await fstore.collection_list("alert_rules", enabled=True)
except Exception as e:
logger.warning(f"Alerter: could not load rules: {e}")
return
for rule in rules:
matched_keywords = _match_rule(rule, talkgroup_id, tags, transcript)
if not matched_keywords:
continue
alert_id = str(uuid.uuid4())
snippet = _snippet(transcript)
now = datetime.now(timezone.utc).isoformat()
event = {
"alert_id": alert_id,
"rule_id": rule.get("rule_id", ""),
"rule_name": rule.get("name", ""),
"call_id": call_id,
"node_id": node_id,
"talkgroup_id": talkgroup_id,
"talkgroup_name": talkgroup_name or "",
"matched_keywords": matched_keywords,
"transcript_snippet": snippet,
"triggered_at": now,
"acknowledged": False,
}
try:
await fstore.doc_set("alert_events", alert_id, event, merge=False)
logger.info(
f"Alert fired: rule='{rule.get('name')}' call={call_id} "
f"keywords={matched_keywords}"
)
except Exception as e:
logger.warning(f"Alerter: could not save alert event: {e}")
continue
webhook_url = rule.get("discord_webhook")
if webhook_url:
await _post_webhook(webhook_url, rule.get("name", ""), talkgroup_name, matched_keywords, snippet)
def _match_rule(
rule: dict,
talkgroup_id: Optional[int],
tags: list[str],
transcript: Optional[str],
) -> list[str]:
"""Return list of matched keywords/reasons, or empty list if no match."""
matched: list[str] = []
# Talkgroup ID match
rule_tg_ids = rule.get("talkgroup_ids", [])
if rule_tg_ids and talkgroup_id is not None and talkgroup_id in rule_tg_ids:
matched.append(f"talkgroup:{talkgroup_id}")
# Keyword match against tags + transcript
rule_keywords = [kw.lower() for kw in rule.get("keywords", [])]
for kw in rule_keywords:
if kw in tags:
matched.append(kw)
elif transcript and kw in transcript.lower():
matched.append(kw)
return matched
def _snippet(transcript: Optional[str], max_len: int = 200) -> Optional[str]:
if not transcript:
return None
return transcript[:max_len] + ("" if len(transcript) > max_len else "")
async def _post_webhook(
url: str,
rule_name: str,
talkgroup_name: Optional[str],
matched_keywords: list[str],
snippet: Optional[str],
) -> None:
try:
import httpx
tg_label = talkgroup_name or "Unknown"
kw_str = ", ".join(matched_keywords)
body = (
f"**Alert: {rule_name}**\n"
f"Talkgroup: {tg_label}\n"
f"Matched: {kw_str}"
)
if snippet:
body += f"\n> {snippet}"
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(url, json={"content": body})
except Exception as e:
logger.warning(f"Alerter: Discord webhook POST failed: {e}")