272 lines
11 KiB
Python
272 lines
11 KiB
Python
import asyncio
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
import paho.mqtt.client as mqtt
|
|
from app.config import settings
|
|
from app.internal.logger import logger
|
|
from app.internal import firestore as fstore
|
|
|
|
|
|
class MQTTHandler:
|
|
def __init__(self):
|
|
self._client: Optional[mqtt.Client] = None
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
self._connected = False
|
|
|
|
def _build_client(self) -> mqtt.Client:
|
|
client = mqtt.Client(
|
|
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
|
|
client_id="drb-c2-core",
|
|
)
|
|
if settings.mqtt_user:
|
|
client.username_pw_set(settings.mqtt_user, settings.mqtt_pass)
|
|
|
|
client.on_connect = self._on_connect
|
|
client.on_disconnect = self._on_disconnect
|
|
client.on_message = self._on_message
|
|
return client
|
|
|
|
def _on_connect(self, client, userdata, flags, reason_code, properties):
|
|
if reason_code == 0:
|
|
self._connected = True
|
|
client.subscribe("nodes/+/checkin", qos=1)
|
|
client.subscribe("nodes/+/status", qos=1)
|
|
client.subscribe("nodes/+/metadata", qos=1)
|
|
client.subscribe("nodes/+/key_request", qos=1)
|
|
logger.info("MQTT connected — subscribed to node topics.")
|
|
else:
|
|
logger.error(f"MQTT connect refused: {reason_code}")
|
|
|
|
def _on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties):
|
|
self._connected = False
|
|
logger.warning(f"MQTT disconnected: {reason_code}")
|
|
|
|
def _on_message(self, client, userdata, msg):
|
|
try:
|
|
payload = json.loads(msg.payload.decode())
|
|
except Exception:
|
|
logger.warning(f"Non-JSON MQTT message on {msg.topic}")
|
|
return
|
|
|
|
asyncio.run_coroutine_threadsafe(
|
|
self._dispatch(msg.topic, payload), self._loop
|
|
)
|
|
|
|
async def _dispatch(self, topic: str, payload: dict):
|
|
parts = topic.split("/")
|
|
# Expected: nodes/{node_id}/{type}
|
|
if len(parts) != 3 or parts[0] != "nodes":
|
|
return
|
|
|
|
node_id = parts[1]
|
|
msg_type = parts[2]
|
|
|
|
try:
|
|
if msg_type == "checkin":
|
|
await self._handle_checkin(node_id, payload)
|
|
elif msg_type == "status":
|
|
await self._handle_status(node_id, payload)
|
|
elif msg_type == "metadata":
|
|
await self._handle_metadata(node_id, payload)
|
|
elif msg_type == "key_request":
|
|
await self._handle_key_request(node_id)
|
|
except Exception as e:
|
|
logger.error(f"MQTT dispatch error [{msg_type}] from {node_id}: {e}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Checkin — upsert node; flag new unconfigured nodes
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _handle_checkin(self, node_id: str, payload: dict):
|
|
existing = await fstore.doc_get("nodes", node_id)
|
|
now = datetime.now(timezone.utc)
|
|
|
|
if not existing:
|
|
# First time we've seen this node — create it as unconfigured, pending approval
|
|
doc = {
|
|
"node_id": node_id,
|
|
"name": payload.get("name", node_id),
|
|
"lat": payload.get("lat", 0.0),
|
|
"lon": payload.get("lon", 0.0),
|
|
"status": "unconfigured",
|
|
"configured": False,
|
|
"last_seen": now.isoformat(),
|
|
"assigned_system_id": None,
|
|
"approval_status": "pending",
|
|
}
|
|
await fstore.doc_set("nodes", node_id, doc, merge=False)
|
|
logger.info(f"New node registered: {node_id} — pending admin approval.")
|
|
else:
|
|
updates = {
|
|
"last_seen": now.isoformat(),
|
|
"name": payload.get("name", existing.get("name", node_id)),
|
|
"lat": payload.get("lat", existing.get("lat", 0.0)),
|
|
"lon": payload.get("lon", existing.get("lon", 0.0)),
|
|
}
|
|
# Only promote to online if already configured (don't overwrite explicit status)
|
|
if existing.get("configured") and existing.get("status") not in ("recording",):
|
|
updates["status"] = "online"
|
|
await fstore.doc_update("nodes", node_id, updates)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Status update
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _handle_status(self, node_id: str, payload: dict):
|
|
status = payload.get("status")
|
|
if not status:
|
|
return
|
|
await fstore.doc_update("nodes", node_id, {
|
|
"status": status,
|
|
"last_seen": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
|
|
# ------------------------------------------------------------------
|
|
# Metadata — call_start / call_end events
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _handle_metadata(self, node_id: str, payload: dict):
|
|
event = payload.get("event")
|
|
if event == "call_start":
|
|
await self._on_call_start(node_id, payload)
|
|
elif event == "call_end":
|
|
await self._on_call_end(node_id, payload)
|
|
|
|
async def _on_call_start(self, node_id: str, payload: dict):
|
|
call_id = payload.get("call_id")
|
|
if not call_id:
|
|
return
|
|
|
|
# Look up assigned system for this node
|
|
node = await fstore.doc_get("nodes", node_id)
|
|
system_id = node.get("assigned_system_id") if node else None
|
|
|
|
started_at_raw = payload.get("started_at")
|
|
started_at = (
|
|
datetime.fromisoformat(started_at_raw)
|
|
if started_at_raw
|
|
else datetime.now(timezone.utc)
|
|
)
|
|
|
|
doc = {
|
|
"call_id": call_id,
|
|
"node_id": node_id,
|
|
"system_id": system_id,
|
|
"talkgroup_id": payload.get("tgid"),
|
|
"talkgroup_name": payload.get("tgid_name") or "",
|
|
"freq": payload.get("freq"),
|
|
"srcaddr": payload.get("srcaddr"),
|
|
"started_at": started_at,
|
|
"ended_at": None,
|
|
"audio_url": None,
|
|
"transcript": None,
|
|
"incident_id": None,
|
|
"location": None,
|
|
"tags": [],
|
|
"status": "active",
|
|
}
|
|
await fstore.doc_set("calls", call_id, doc, merge=False)
|
|
logger.info(f"Call start: {call_id} (node={node_id}, tgid={payload.get('tgid')})")
|
|
|
|
async def _on_call_end(self, node_id: str, payload: dict):
|
|
call_id = payload.get("call_id")
|
|
if not call_id:
|
|
return
|
|
|
|
ended_at_raw = payload.get("ended_at")
|
|
ended_at = (
|
|
datetime.fromisoformat(ended_at_raw)
|
|
if ended_at_raw
|
|
else datetime.now(timezone.utc)
|
|
)
|
|
|
|
updates = {
|
|
"ended_at": ended_at,
|
|
"status": "ended",
|
|
}
|
|
if payload.get("audio_url"):
|
|
updates["audio_url"] = payload["audio_url"]
|
|
|
|
await fstore.doc_update("calls", call_id, updates)
|
|
logger.info(f"Call end: {call_id}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Key request — re-deliver an existing approved key to a node that
|
|
# lost its credentials (e.g. after a directory move / fresh volume)
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _handle_key_request(self, node_id: str):
|
|
key_doc = await fstore.doc_get("node_keys", node_id)
|
|
if not key_doc or not key_doc.get("api_key"):
|
|
logger.warning(f"Key request from {node_id} but no key found in Firestore — node may not be approved yet.")
|
|
return
|
|
self.publish_node_key(node_id, key_doc["api_key"])
|
|
logger.info(f"Re-delivered API key to {node_id} on request.")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Outbound — send a command to a specific node
|
|
# ------------------------------------------------------------------
|
|
|
|
def send_command(self, node_id: str, payload: dict) -> bool:
|
|
topic = f"nodes/{node_id}/commands"
|
|
if self._client and self._connected:
|
|
self._client.publish(topic, json.dumps(payload), qos=1)
|
|
logger.info(f"Command sent to {node_id}: {payload.get('action')}")
|
|
return True
|
|
logger.warning(f"MQTT not connected — could not send command to {node_id}")
|
|
return False
|
|
|
|
def push_config(self, node_id: str, system_config: dict):
|
|
topic = f"nodes/{node_id}/config"
|
|
if self._client and self._connected:
|
|
self._client.publish(topic, json.dumps(system_config), qos=1)
|
|
logger.info(f"Config pushed to {node_id}")
|
|
else:
|
|
logger.warning(f"MQTT not connected — could not push config to {node_id}")
|
|
|
|
def publish_node_key(self, node_id: str, api_key: str):
|
|
"""Publish the provisioned API key to the node (retained so it survives reconnects)."""
|
|
topic = f"nodes/{node_id}/api_key"
|
|
if self._client and self._connected:
|
|
self._client.publish(topic, json.dumps({"api_key": api_key}), qos=2, retain=True)
|
|
logger.info(f"API key provisioned to {node_id}")
|
|
else:
|
|
logger.warning(f"MQTT not connected — could not provision key to {node_id}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Lifecycle
|
|
# ------------------------------------------------------------------
|
|
|
|
async def connect(self):
|
|
self._loop = asyncio.get_event_loop()
|
|
self._client = self._build_client()
|
|
# Start the paho network loop first so it drives reconnects automatically,
|
|
# then keep attempting the initial TCP connect until it succeeds.
|
|
self._client.loop_start()
|
|
asyncio.create_task(self._connect_with_retry())
|
|
|
|
async def _connect_with_retry(self):
|
|
delay = 5
|
|
logger.info(f"MQTT connecting to {settings.mqtt_broker}:{settings.mqtt_port}")
|
|
while True:
|
|
try:
|
|
self._client.connect(settings.mqtt_broker, settings.mqtt_port, keepalive=60)
|
|
return # paho loop_start + reconnect_delay_set handles the rest
|
|
except Exception as e:
|
|
logger.warning(f"MQTT connect failed ({e}) — retrying in {delay}s")
|
|
await asyncio.sleep(delay)
|
|
delay = min(delay * 2, 60)
|
|
|
|
async def disconnect(self):
|
|
if self._client:
|
|
self._client.loop_stop()
|
|
self._client.disconnect()
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
return self._connected
|
|
|
|
|
|
mqtt_handler = MQTTHandler()
|