import asyncio import json from datetime import datetime, timezone from typing import Optional import paho.mqtt.client as mqtt from app.config import settings from app.internal.logger import logger from app.internal import firestore as fstore class MQTTHandler: def __init__(self): self._client: Optional[mqtt.Client] = None self._loop: Optional[asyncio.AbstractEventLoop] = None self._connected = False def _build_client(self) -> mqtt.Client: client = mqtt.Client( callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id="drb-c2-core", ) if settings.mqtt_user: client.username_pw_set(settings.mqtt_user, settings.mqtt_pass) client.on_connect = self._on_connect client.on_disconnect = self._on_disconnect client.on_message = self._on_message return client def _on_connect(self, client, userdata, flags, reason_code, properties): if reason_code == 0: self._connected = True client.subscribe("nodes/+/checkin", qos=1) client.subscribe("nodes/+/status", qos=1) client.subscribe("nodes/+/metadata", qos=1) logger.info("MQTT connected — subscribed to node topics.") else: logger.error(f"MQTT connect refused: {reason_code}") def _on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties): self._connected = False logger.warning(f"MQTT disconnected: {reason_code}") def _on_message(self, client, userdata, msg): try: payload = json.loads(msg.payload.decode()) except Exception: logger.warning(f"Non-JSON MQTT message on {msg.topic}") return asyncio.run_coroutine_threadsafe( self._dispatch(msg.topic, payload), self._loop ) async def _dispatch(self, topic: str, payload: dict): parts = topic.split("/") # Expected: nodes/{node_id}/{type} if len(parts) != 3 or parts[0] != "nodes": return node_id = parts[1] msg_type = parts[2] try: if msg_type == "checkin": await self._handle_checkin(node_id, payload) elif msg_type == "status": await self._handle_status(node_id, payload) elif msg_type == "metadata": await self._handle_metadata(node_id, payload) except Exception as e: logger.error(f"MQTT dispatch error [{msg_type}] from {node_id}: {e}") # ------------------------------------------------------------------ # Checkin — upsert node; flag new unconfigured nodes # ------------------------------------------------------------------ async def _handle_checkin(self, node_id: str, payload: dict): existing = await fstore.doc_get("nodes", node_id) now = datetime.now(timezone.utc) if not existing: # First time we've seen this node — create it as unconfigured, pending approval doc = { "node_id": node_id, "name": payload.get("name", node_id), "lat": payload.get("lat", 0.0), "lon": payload.get("lon", 0.0), "status": "unconfigured", "configured": False, "last_seen": now.isoformat(), "assigned_system_id": None, "approval_status": "pending", } await fstore.doc_set("nodes", node_id, doc, merge=False) logger.info(f"New node registered: {node_id} — pending admin approval.") else: updates = { "last_seen": now.isoformat(), "name": payload.get("name", existing.get("name", node_id)), "lat": payload.get("lat", existing.get("lat", 0.0)), "lon": payload.get("lon", existing.get("lon", 0.0)), } # Only promote to online if already configured (don't overwrite explicit status) if existing.get("configured") and existing.get("status") not in ("recording",): updates["status"] = "online" await fstore.doc_update("nodes", node_id, updates) # ------------------------------------------------------------------ # Status update # ------------------------------------------------------------------ async def _handle_status(self, node_id: str, payload: dict): status = payload.get("status") if not status: return await fstore.doc_update("nodes", node_id, { "status": status, "last_seen": datetime.now(timezone.utc).isoformat(), }) # ------------------------------------------------------------------ # Metadata — call_start / call_end events # ------------------------------------------------------------------ async def _handle_metadata(self, node_id: str, payload: dict): event = payload.get("event") if event == "call_start": await self._on_call_start(node_id, payload) elif event == "call_end": await self._on_call_end(node_id, payload) async def _on_call_start(self, node_id: str, payload: dict): call_id = payload.get("call_id") if not call_id: return # Look up assigned system for this node node = await fstore.doc_get("nodes", node_id) system_id = node.get("assigned_system_id") if node else None started_at_raw = payload.get("started_at") started_at = ( datetime.fromisoformat(started_at_raw) if started_at_raw else datetime.now(timezone.utc) ) doc = { "call_id": call_id, "node_id": node_id, "system_id": system_id, "talkgroup_id": payload.get("tgid"), "talkgroup_name": payload.get("tgid_name") or "", "freq": payload.get("freq"), "srcaddr": payload.get("srcaddr"), "started_at": started_at, "ended_at": None, "audio_url": None, "transcript": None, "incident_id": None, "location": None, "tags": [], "status": "active", } await fstore.doc_set("calls", call_id, doc, merge=False) logger.info(f"Call start: {call_id} (node={node_id}, tgid={payload.get('tgid')})") async def _on_call_end(self, node_id: str, payload: dict): call_id = payload.get("call_id") if not call_id: return ended_at_raw = payload.get("ended_at") ended_at = ( datetime.fromisoformat(ended_at_raw) if ended_at_raw else datetime.now(timezone.utc) ) updates = { "ended_at": ended_at, "status": "ended", } if payload.get("audio_url"): updates["audio_url"] = payload["audio_url"] await fstore.doc_update("calls", call_id, updates) logger.info(f"Call end: {call_id}") # ------------------------------------------------------------------ # Outbound — send a command to a specific node # ------------------------------------------------------------------ def send_command(self, node_id: str, payload: dict): topic = f"nodes/{node_id}/commands" if self._client and self._connected: self._client.publish(topic, json.dumps(payload), qos=1) logger.info(f"Command sent to {node_id}: {payload.get('action')}") else: logger.warning(f"MQTT not connected — could not send command to {node_id}") def push_config(self, node_id: str, system_config: dict): topic = f"nodes/{node_id}/config" if self._client and self._connected: self._client.publish(topic, json.dumps(system_config), qos=1) logger.info(f"Config pushed to {node_id}") else: logger.warning(f"MQTT not connected — could not push config to {node_id}") def publish_node_key(self, node_id: str, api_key: str): """Publish the provisioned API key to the node (retained so it survives reconnects).""" topic = f"nodes/{node_id}/api_key" if self._client and self._connected: self._client.publish(topic, json.dumps({"api_key": api_key}), qos=2, retain=True) logger.info(f"API key provisioned to {node_id}") else: logger.warning(f"MQTT not connected — could not provision key to {node_id}") # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ async def connect(self): self._loop = asyncio.get_event_loop() self._client = self._build_client() try: self._client.connect(settings.mqtt_broker, settings.mqtt_port, keepalive=60) self._client.loop_start() logger.info(f"MQTT connecting to {settings.mqtt_broker}:{settings.mqtt_port}") except Exception as e: logger.error(f"MQTT connection error: {e}") async def disconnect(self): if self._client: self._client.loop_stop() self._client.disconnect() @property def is_connected(self) -> bool: return self._connected mqtt_handler = MQTTHandler()