Initial commit — DRB server stack
Includes c2-core (FastAPI/MQTT/Firestore), discord-bot (slash commands), frontend (Next.js admin UI), and mosquitto config.
This commit is contained in:
@@ -0,0 +1,244 @@
|
||||
import asyncio
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
import paho.mqtt.client as mqtt
|
||||
from app.config import settings
|
||||
from app.internal.logger import logger
|
||||
from app.internal import firestore as fstore
|
||||
|
||||
|
||||
class MQTTHandler:
|
||||
def __init__(self):
|
||||
self._client: Optional[mqtt.Client] = None
|
||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
self._connected = False
|
||||
|
||||
def _build_client(self) -> mqtt.Client:
|
||||
client = mqtt.Client(
|
||||
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
|
||||
client_id="drb-c2-core",
|
||||
)
|
||||
if settings.mqtt_user:
|
||||
client.username_pw_set(settings.mqtt_user, settings.mqtt_pass)
|
||||
|
||||
client.on_connect = self._on_connect
|
||||
client.on_disconnect = self._on_disconnect
|
||||
client.on_message = self._on_message
|
||||
return client
|
||||
|
||||
def _on_connect(self, client, userdata, flags, reason_code, properties):
|
||||
if reason_code == 0:
|
||||
self._connected = True
|
||||
client.subscribe("nodes/+/checkin", qos=1)
|
||||
client.subscribe("nodes/+/status", qos=1)
|
||||
client.subscribe("nodes/+/metadata", qos=1)
|
||||
logger.info("MQTT connected — subscribed to node topics.")
|
||||
else:
|
||||
logger.error(f"MQTT connect refused: {reason_code}")
|
||||
|
||||
def _on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties):
|
||||
self._connected = False
|
||||
logger.warning(f"MQTT disconnected: {reason_code}")
|
||||
|
||||
def _on_message(self, client, userdata, msg):
|
||||
try:
|
||||
payload = json.loads(msg.payload.decode())
|
||||
except Exception:
|
||||
logger.warning(f"Non-JSON MQTT message on {msg.topic}")
|
||||
return
|
||||
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._dispatch(msg.topic, payload), self._loop
|
||||
)
|
||||
|
||||
async def _dispatch(self, topic: str, payload: dict):
|
||||
parts = topic.split("/")
|
||||
# Expected: nodes/{node_id}/{type}
|
||||
if len(parts) != 3 or parts[0] != "nodes":
|
||||
return
|
||||
|
||||
node_id = parts[1]
|
||||
msg_type = parts[2]
|
||||
|
||||
try:
|
||||
if msg_type == "checkin":
|
||||
await self._handle_checkin(node_id, payload)
|
||||
elif msg_type == "status":
|
||||
await self._handle_status(node_id, payload)
|
||||
elif msg_type == "metadata":
|
||||
await self._handle_metadata(node_id, payload)
|
||||
except Exception as e:
|
||||
logger.error(f"MQTT dispatch error [{msg_type}] from {node_id}: {e}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Checkin — upsert node; flag new unconfigured nodes
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _handle_checkin(self, node_id: str, payload: dict):
|
||||
existing = await fstore.doc_get("nodes", node_id)
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
if not existing:
|
||||
# First time we've seen this node — create it as unconfigured, pending approval
|
||||
doc = {
|
||||
"node_id": node_id,
|
||||
"name": payload.get("name", node_id),
|
||||
"lat": payload.get("lat", 0.0),
|
||||
"lon": payload.get("lon", 0.0),
|
||||
"status": "unconfigured",
|
||||
"configured": False,
|
||||
"last_seen": now.isoformat(),
|
||||
"assigned_system_id": None,
|
||||
"approval_status": "pending",
|
||||
}
|
||||
await fstore.doc_set("nodes", node_id, doc, merge=False)
|
||||
logger.info(f"New node registered: {node_id} — pending admin approval.")
|
||||
else:
|
||||
updates = {
|
||||
"last_seen": now.isoformat(),
|
||||
"name": payload.get("name", existing.get("name", node_id)),
|
||||
"lat": payload.get("lat", existing.get("lat", 0.0)),
|
||||
"lon": payload.get("lon", existing.get("lon", 0.0)),
|
||||
}
|
||||
# Only promote to online if already configured (don't overwrite explicit status)
|
||||
if existing.get("configured") and existing.get("status") not in ("recording",):
|
||||
updates["status"] = "online"
|
||||
await fstore.doc_update("nodes", node_id, updates)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Status update
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _handle_status(self, node_id: str, payload: dict):
|
||||
status = payload.get("status")
|
||||
if not status:
|
||||
return
|
||||
await fstore.doc_update("nodes", node_id, {
|
||||
"status": status,
|
||||
"last_seen": datetime.now(timezone.utc).isoformat(),
|
||||
})
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Metadata — call_start / call_end events
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _handle_metadata(self, node_id: str, payload: dict):
|
||||
event = payload.get("event")
|
||||
if event == "call_start":
|
||||
await self._on_call_start(node_id, payload)
|
||||
elif event == "call_end":
|
||||
await self._on_call_end(node_id, payload)
|
||||
|
||||
async def _on_call_start(self, node_id: str, payload: dict):
|
||||
call_id = payload.get("call_id")
|
||||
if not call_id:
|
||||
return
|
||||
|
||||
# Look up assigned system for this node
|
||||
node = await fstore.doc_get("nodes", node_id)
|
||||
system_id = node.get("assigned_system_id") if node else None
|
||||
|
||||
started_at_raw = payload.get("started_at")
|
||||
started_at = (
|
||||
datetime.fromisoformat(started_at_raw)
|
||||
if started_at_raw
|
||||
else datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
doc = {
|
||||
"call_id": call_id,
|
||||
"node_id": node_id,
|
||||
"system_id": system_id,
|
||||
"talkgroup_id": payload.get("tgid"),
|
||||
"talkgroup_name": payload.get("tgid_name") or "",
|
||||
"freq": payload.get("freq"),
|
||||
"srcaddr": payload.get("srcaddr"),
|
||||
"started_at": started_at,
|
||||
"ended_at": None,
|
||||
"audio_url": None,
|
||||
"transcript": None,
|
||||
"incident_id": None,
|
||||
"location": None,
|
||||
"tags": [],
|
||||
"status": "active",
|
||||
}
|
||||
await fstore.doc_set("calls", call_id, doc, merge=False)
|
||||
logger.info(f"Call start: {call_id} (node={node_id}, tgid={payload.get('tgid')})")
|
||||
|
||||
async def _on_call_end(self, node_id: str, payload: dict):
|
||||
call_id = payload.get("call_id")
|
||||
if not call_id:
|
||||
return
|
||||
|
||||
ended_at_raw = payload.get("ended_at")
|
||||
ended_at = (
|
||||
datetime.fromisoformat(ended_at_raw)
|
||||
if ended_at_raw
|
||||
else datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
updates = {
|
||||
"ended_at": ended_at,
|
||||
"status": "ended",
|
||||
}
|
||||
if payload.get("audio_url"):
|
||||
updates["audio_url"] = payload["audio_url"]
|
||||
|
||||
await fstore.doc_update("calls", call_id, updates)
|
||||
logger.info(f"Call end: {call_id}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Outbound — send a command to a specific node
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def send_command(self, node_id: str, payload: dict):
|
||||
topic = f"nodes/{node_id}/commands"
|
||||
if self._client and self._connected:
|
||||
self._client.publish(topic, json.dumps(payload), qos=1)
|
||||
logger.info(f"Command sent to {node_id}: {payload.get('action')}")
|
||||
else:
|
||||
logger.warning(f"MQTT not connected — could not send command to {node_id}")
|
||||
|
||||
def push_config(self, node_id: str, system_config: dict):
|
||||
topic = f"nodes/{node_id}/config"
|
||||
if self._client and self._connected:
|
||||
self._client.publish(topic, json.dumps(system_config), qos=1)
|
||||
logger.info(f"Config pushed to {node_id}")
|
||||
else:
|
||||
logger.warning(f"MQTT not connected — could not push config to {node_id}")
|
||||
|
||||
def publish_node_key(self, node_id: str, api_key: str):
|
||||
"""Publish the provisioned API key to the node (retained so it survives reconnects)."""
|
||||
topic = f"nodes/{node_id}/api_key"
|
||||
if self._client and self._connected:
|
||||
self._client.publish(topic, json.dumps({"api_key": api_key}), qos=2, retain=True)
|
||||
logger.info(f"API key provisioned to {node_id}")
|
||||
else:
|
||||
logger.warning(f"MQTT not connected — could not provision key to {node_id}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def connect(self):
|
||||
self._loop = asyncio.get_event_loop()
|
||||
self._client = self._build_client()
|
||||
try:
|
||||
self._client.connect(settings.mqtt_broker, settings.mqtt_port, keepalive=60)
|
||||
self._client.loop_start()
|
||||
logger.info(f"MQTT connecting to {settings.mqtt_broker}:{settings.mqtt_port}")
|
||||
except Exception as e:
|
||||
logger.error(f"MQTT connection error: {e}")
|
||||
|
||||
async def disconnect(self):
|
||||
if self._client:
|
||||
self._client.loop_stop()
|
||||
self._client.disconnect()
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self._connected
|
||||
|
||||
|
||||
mqtt_handler = MQTTHandler()
|
||||
Reference in New Issue
Block a user