Fix over-correlation: geocoding precision, thin path ambiguity, skip_reason propagation

- Geocoding: reject GEOMETRIC_CENTER/APPROXIMATE results — vague location strings
  (regions, city centroids) were resolving to node-area coords and creating false
  proximity matches that merged unrelated incidents
- Thin path: on dispatch channels with multiple active incidents, skip attachment
  rather than guessing — "10-4" with 3 active incidents is genuinely ambiguous
- Short transcripts (≤5 words) now write skip_reason="transcript_too_short" to
  the call doc, matching garbage transcript behavior
- upload.py no-scenes fallback now checks skip_reason before running correlation —
  flagged calls (garbage, too short) no longer attach via thin path
- Update Server README to reflect current project purpose, goals, and pipeline
This commit is contained in:
Logan
2026-05-31 23:51:46 -04:00
parent f774be12b8
commit b77d2cce36
4 changed files with 111 additions and 152 deletions
+67 -137
View File
@@ -1,10 +1,21 @@
# DRB Server
Full-stack backend for the Discord Radio Bot (DRB) system. Receives telemetry from SDR edge nodes over MQTT, stores data in Firestore, orchestrates Discord voice bots through a token pool, and serves a real-time admin dashboard.
Full-stack backend for the DRB (Distributed Radio Bot) platform — a community-powered, real-time situational awareness system for public safety radio monitoring. Receives telemetry and audio from SDR edge nodes, runs an AI intelligence pipeline on every call, correlates calls into incidents, dispatches alerts, and serves a live operational dashboard.
## What DRB Is
DRB turns radio scanners into a shared geographic intelligence picture. Anyone can contribute coverage by running an edge node (an RTL-SDR dongle on a small Linux machine). All nodes feed into a single platform where operators can:
- **Monitor in real time** — see active incidents on a map as they unfold, with calls grouped by incident as they come in
- **Listen live** — stream audio to Discord voice channels or directly in the browser
- **Get alerted** — receive push notifications when significant events are detected
- **Investigate after the fact** — search transcribed call history, replay audio, and review incident timelines
The intended deployment is a Tactical Operations Center (TOC) display: a map showing active incidents across all covered jurisdictions, with live audio and alert feeds for operators monitoring an area during an event (severe weather, major incident, etc.).
## System Overview
DRB is a distributed SDR (Software Defined Radio) monitoring platform. Edge nodes (small Linux machines with RTL-SDR dongles) decode radio systems and stream audio. The server coordinates those nodes, manages which Discord voice channels receive audio, and stores call history.
DRB is a distributed SDR (Software Defined Radio) monitoring platform. Edge nodes (small Linux machines with RTL-SDR dongles) decode radio systems and stream audio. The server coordinates those nodes, runs the intelligence pipeline on each call, manages Discord voice bot assignments, and serves the operational dashboard.
```
Edge Node (client machine)
@@ -28,9 +39,10 @@ Browser (admin)
| Service | Container | Description | Port |
|---|---|---|---|
| `mosquitto` | `eclipse-mosquitto` | MQTT broker — receives telemetry from edge nodes, dispatches commands | 1883 |
| `c2-core` | Python 3.11 / FastAPI | Command & control API — MQTT subscriber, Firestore writes, node/system/call/token management | 8000 |
| `mosquitto` | `eclipse-mosquitto` | MQTT broker — receives telemetry from edge nodes, dispatches commands | 1883 |
| `c2-core` | Python 3.11 / FastAPI | Command & control API — MQTT handler, intelligence pipeline, incident correlator, alert dispatch, node/system/token management | 8000 |
| `discord-bot` | Python 3.11 / discord.py | Server-side Discord slash command bot — `/join`, `/leave`, `/status`, `/help` | — |
| `frontend` | Node.js / Next.js 14 | Admin web dashboard — real-time node map, call logs, system & token management | 3000 |
| `frontend` | Node.js / Next.js 14 | Operational web dashboard — live map, incident feed, call history, node and system management | 3000 |
## Directory Structure
@@ -49,19 +61,19 @@ Server/
│ │ │ ├── systems.py # Radio system CRUD (P25/DMR/NBFM configs)
│ │ │ ├── calls.py # Call log retrieval (read-only — calls created by MQTT handler)
│ │ │ ├── tokens.py # Discord bot token pool — add/delete/list; assign_token/release_token helpers
│ │ │ ├── upload.py # Audio file upload endpoint (called by edge nodes)
│ │ │ ├── incidents.py # [PLANNED] Incident CRUD, manual link/unlink calls, resolve
│ │ │ └── alerts.py # [PLANNED] Alert rule CRUD
│ │ │ ├── upload.py # Audio file upload endpoint (called by edge nodes); triggers intelligence pipeline
│ │ │ ├── incidents.py # Incident CRUD, manual link/unlink calls, resolve
│ │ │ └── alerts.py # Alert rule CRUD
│ │ └── internal/
│ │ ├── auth.py # Auth — Firebase ID token OR service key (Bearer)
│ │ ├── firestore.py # Async Firestore wrappers (doc_get, doc_set, doc_update, collection_list)
│ │ ├── mqtt_handler.py # MQTT subscriber — call_start/call_end handlers, node checkin/status
│ │ ├── node_sweeper.py # Background task — marks nodes offline after 90s without heartbeat
│ │ ├── storage.py # GCS audio file uploads
│ │ ├── transcription.py # [PLANNED] STT pipeline — Whisper or Google Speech API
│ │ ├── intelligence.py # [PLANNED] Keyword/entity extraction, severity scoring
│ │ ├── incident_correlator.py # [PLANNED] Match calls to incidents or create new ones
│ │ └── alerter.py # [PLANNED] Alert dispatch (Discord webhook, push, etc.)
│ │ ├── transcription.py # STT pipeline — OpenAI Whisper / GPT-4o transcribe
│ │ ├── intelligence.py # GPT scene extraction: tags, incident type, location, units, vehicles; geocoding
│ │ ├── incident_correlator.py # Hybrid correlator — matches calls to incidents or creates new ones
│ │ └── alerter.py # Alert dispatch — evaluates rules, sends notifications
│ ├── scripts/
│ │ └── set_admin.py # CLI to grant/revoke Firebase admin custom claim
│ ├── gcp-key.json # GCP service account key — place here, NOT committed to git
@@ -80,13 +92,13 @@ Server/
└── drb-frontend/ # Next.js 14 admin dashboard
├── app/
│ ├── dashboard/ # Overview: active node grid + live call feed
│ ├── map/ # Leaflet map — node locations + [PLANNED] incident pins
│ ├── calls/ # Call history — talkgroup, duration, audio, transcript (when populated)
│ ├── map/ # Leaflet map — node locations, active incident pins
│ ├── calls/ # Call history — talkgroup, duration, audio playback, transcript
│ ├── nodes/ # Node list + per-node detail (approve, reject, assign system)
│ ├── systems/ # Radio system CRUD (P25 form with talkgroup editor, DMR/NBFM JSON)
│ ├── tokens/ # Discord bot token pool management
│ ├── incidents/ # [PLANNED] Incident list/detail — linked calls, location, resolve
│ ├── alerts/ # [PLANNED] Alert rule configuration
│ ├── incidents/ # Incident list/detail — linked calls, location, summary, resolve
│ ├── alerts/ # Alert rule configuration
│ └── login/ # Firebase Auth login page
├── components/
│ ├── MapView.tsx # react-leaflet map with status-colored markers
@@ -218,138 +230,56 @@ Edge nodes join Discord voice channels using bot tokens managed by the server. A
## Call & Intelligence Pipeline
This is the full intended data lifecycle for every radio call — from raw RF to searchable, cross-referenced intelligence. The data models and Firestore schema are already designed for this; several pipeline stages are stubs awaiting implementation.
### Call lifecycle (current — fully working)
Every radio call flows through a four-stage pipeline triggered when the edge node uploads its recording:
```
Edge node ──► MQTT call_start ──► c2-core creates CallRecord (status: active)
talkgroup_id, talkgroup_name, freq,
│ node_id, system_id, started_at
Firestore "calls" collection
Edge node ──► MQTT call_start ──► CallRecord created (active)
Edge node ──► audio upload ──► GCS storage
Frontend live call feed / map popups
[1] TRANSCRIPTION
OpenAI Whisper / GPT-4o transcribe
→ CallRecord.transcript
Edge node ──► MQTT call_end ──► c2-core updates CallRecord (status: ended)
│ ended_at, audio_url (GCS link)
Frontend call history / audio playback
[2] INTELLIGENCE EXTRACTION (GPT-4o-mini)
Scene detection, entity extraction:
tags, incident_type, location, units,
vehicles, severity, resolved flag
+ geocoding (Google Maps)
+ embedding (text-embedding-3-small)
→ CallRecord.tags, .location, .units, etc.
[3] INCIDENT CORRELATION (hybrid engine)
Fast path — talkgroup + recency
Unit path — same officer continuity
Location — proximity match
Cross-TG — multi-agency / channel hop
Slow path — embedding similarity
→ IncidentRecord created or updated
[4] ALERT DISPATCH
Evaluate alert rules (keywords, talkgroups)
→ notifications sent
```
### Intelligence pipeline (designed — implementation pending)
After a call ends, the following stages should fire in order:
```
CallRecord (ended, audio_url set)
[1] TRANSCRIPTION
Speech-to-text on the GCS audio file (Whisper or Google Speech-to-Text)
→ writes CallRecord.transcript
[2] INTELLIGENCE EXTRACTION
Analyze transcript for:
- Named entities: unit IDs, street addresses, location references
- Keywords / keyword sets: fire/EMS/police/hazmat/pursuit/shots fired/etc.
- Radio codes (10-codes, signals) mapped to plain English
- Severity scoring
→ writes CallRecord.tags[], CallRecord.location (geocoded if address found)
[3] INCIDENT CORRELATION
Given the extracted entities + tags, decide:
- Does this call match an existing active IncidentRecord?
(same location ± radius, overlapping tags, recent time window)
→ link: append CallRecord.call_id to IncidentRecord.call_ids,
set CallRecord.incident_id, update IncidentRecord.summary
- Or does this call describe a new event?
→ create IncidentRecord (type, location, title, tags, status: active)
link the call, set CallRecord.incident_id
[4] ALERTS
If incident is new OR severity exceeds threshold:
- Trigger configured alert channels (Discord webhook, push notification, etc.)
- Include: incident type, location, talkgroup, transcript excerpt
```
### Data model fields involved
**`CallRecord`** (Firestore collection: `calls`):
| Field | Type | Populated by |
|---|---|---|
| `call_id` | string | MQTT handler on call_start |
| `node_id` | string | MQTT handler |
| `system_id` | string | MQTT handler (from node's assigned system) |
| `talkgroup_id` | number | MQTT handler |
| `talkgroup_name` | string | MQTT handler |
| `freq` | number | MQTT handler |
| `started_at` | timestamp | MQTT handler |
| `ended_at` | timestamp | MQTT handler on call_end |
| `status` | `active` \| `ended` | MQTT handler |
| `audio_url` | string | Edge node upload → GCS |
| `transcript` | string \| null | **[stub]** Transcription pipeline |
| `incident_id` | string \| null | **[stub]** Incident correlation |
| `location` | `{lat, lng}` \| null | **[stub]** Geocoding from transcript |
| `tags` | string[] | **[stub]** Intelligence extraction |
**`IncidentRecord`** (Firestore collection: `incidents`):
| Field | Type | Description |
|---|---|---|
| `incident_id` | string | UUID |
| `title` | string | Auto-generated or manually set |
| `type` | string | `fire`, `ems`, `police`, `hazmat`, `pursuit`, etc. |
| `status` | `active` \| `resolved` | Updated as calls accumulate or manually resolved |
| `location` | `{lat, lng}` | Geocoded from first call with a location |
| `call_ids` | string[] | All linked CallRecord IDs |
| `started_at` | timestamp | Timestamp of first linked call |
| `updated_at` | timestamp | Updated on each new linked call |
| `summary` | string \| null | Auto-generated from transcripts or manually written |
| `tags` | string[] | Union of tags from all linked calls |
### Backend — what needs to be built
The following do **not exist yet** and need to be created:
- `drb-c2-core/app/routers/incidents.py` — CRUD + manual incident management
- `drb-c2-core/app/routers/alerts.py` — Alert rule configuration
- `drb-c2-core/app/internal/transcription.py` — STT integration (Whisper local or Google Speech API)
- `drb-c2-core/app/internal/intelligence.py` — Keyword/entity extraction, severity scoring
- `drb-c2-core/app/internal/incident_correlator.py` — Match calls to incidents or create new ones
- `drb-c2-core/app/internal/alerter.py` — Dispatch alerts (Discord webhook, etc.)
The `_on_call_end()` handler in `mqtt_handler.py` is the natural trigger point — after updating the CallRecord, it should enqueue the transcription + intelligence pipeline.
### Frontend — what needs to be built
The following pages and nav items are not yet implemented:
- `/incidents` — Incident list and detail view (linked calls, map pin, transcript summary, tags, resolve button)
- `/alerts` — Alert rule configuration (keyword sets, talkgroup filters, notification channels)
- `Nav.tsx` should add **Calls** and **Incidents** as primary nav items; the current Calls page exists but isn't in the design's intended nav hierarchy
The **Map** page should eventually show both node markers and incident pins — incidents with `location` set should appear as color-coded markers (by type/severity) alongside the node status markers.
## Frontend Pages
| Page | URL | Status | Description |
|---|---|---|---|
| Dashboard | `/dashboard` | Working | Live node grid + active call stream |
| Map | `/map` | Working | Leaflet map — nodes color-coded by status, active call popups |
| Calls | `/calls` | Working | Full call history — talkgroup, duration, audio playback, transcript (when populated) |
| Nodes | `/nodes` | Working | Node list; per-node detail for approve/reject/assign system |
| Systems | `/systems` | Working | Create and manage P25/DMR/NBFM radio system configurations |
| Tokens | `/tokens` | Working | Discord bot token pool management |
| Incidents | `/incidents` | **Not built** | Incident list/detail — linked calls, location, summary, tags, resolve |
| Page | URL | Description |
|---|---|---|
| Dashboard | `/dashboard` | Live node grid + active call stream |
| Map | `/map` | Leaflet map — nodes color-coded by status, active call popups |
| Calls | `/calls` | Full call history — talkgroup, duration, audio playback, transcript |
| Nodes | `/nodes` | Node list; per-node detail for approve/reject/assign system |
| Systems | `/systems` | Create and manage P25/DMR/NBFM radio system configurations |
| Tokens | `/tokens` | Discord bot token pool management |
| Incidents | `/incidents` | Incident list/detail — linked calls, location, summary, tags, resolve |
| Alerts | `/alerts` | **Not built** | Alert rule configuration — keywords, talkgroups, notification channels |
## Makefile Targets
@@ -272,6 +272,15 @@ async def correlate_call(
inc for inc in tg_recent
if _incident_idle_minutes(inc, now) <= settings.tg_dispatch_thin_idle_minutes
]
# A shared dispatch channel may have multiple concurrent incidents.
# If more than one is active in the thin window, we cannot know which
# incident this "10-4" or "Copy" belongs to — skip rather than guess.
if len(thin_pool) > 1:
logger.info(
f"Correlator fast-path thin: {len(thin_pool)} active incidents on "
f"dispatch channel — ambiguous, skipping thin call {call_id}"
)
thin_pool = []
else:
thin_pool = tg_recent
+18 -2
View File
@@ -171,6 +171,10 @@ async def extract_scenes(
f"Intelligence: call {call_id} — transcript too short for extraction "
f"({len(transcript.split())} words), skipping"
)
try:
await fstore.doc_set("calls", call_id, {"skip_reason": "transcript_too_short"})
except Exception:
pass
return []
raw_scenes: list[dict] = await asyncio.to_thread(
@@ -382,7 +386,19 @@ async def _geocode_location(
data = r.json()
if data.get("status") != "OK" or not data.get("results"):
return None
loc = data["results"][0]["geometry"]["location"]
result = data["results"][0]
location_type = result.get("geometry", {}).get("location_type", "")
# Only accept address-level precision. GEOMETRIC_CENTER (city/neighborhood
# centroid) and APPROXIMATE (region boundary) produce coordinates that look
# valid but are too vague for 0.5km proximity matching — they often resolve
# to the same point as the node's position and create false proximity matches.
if location_type not in ("ROOFTOP", "RANGE_INTERPOLATED"):
logger.info(
f"Geocoding rejected '{location_str}' — imprecise result "
f"(location_type={location_type!r}), returning None"
)
return None
loc = result["geometry"]["location"]
lat, lng = float(loc["lat"]), float(loc["lng"])
dist_km = _geo_dist_km(node_lat, node_lon, lat, lng)
if dist_km > settings.geocode_max_km:
@@ -392,7 +408,7 @@ async def _geocode_location(
)
return None
coords = {"lat": lat, "lng": lng}
logger.info(f"Geocoded '{location_str}'{coords} ({dist_km:.1f}km from node)")
logger.info(f"Geocoded '{location_str}'{coords} ({dist_km:.1f}km from node) [{location_type}]")
return coords
except Exception as e:
logger.warning(f"Geocoding failed for '{location_str}': {e}")
+17 -13
View File
@@ -241,20 +241,24 @@ async def _run_intelligence_pipeline(
# Correlator also runs for calls with no scenes (unclassified) to attempt
# talkgroup-based linking even when no transcript could be produced.
# Skip when extraction flagged the call — garbage or too-short transcripts
# carry no signal and would only attach spuriously via the thin path.
if not scenes:
incident_id = await incident_correlator.correlate_call(
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=[],
incident_type=None,
location=None,
location_coords=None,
)
if incident_id:
incident_ids.append(incident_id)
_call_doc = await fstore.doc_get("calls", call_id)
if not (_call_doc or {}).get("skip_reason"):
incident_id = await incident_correlator.correlate_call(
call_id=call_id,
node_id=node_id,
system_id=system_id,
talkgroup_id=talkgroup_id,
talkgroup_name=talkgroup_name,
tags=[],
incident_type=None,
location=None,
location_coords=None,
)
if incident_id:
incident_ids.append(incident_id)
if incident_ids:
await fstore.doc_set("calls", call_id, {"incident_ids": incident_ids})