Files
node-26/drb-edge-node/app/internal/mqtt_manager.py
T
Logan d201d8124e Issue 1 — Discord Audio (PulseAudio)
docker-compose.yml: Added a pulse_socket named volume mounted at /run/pulse in both op25 and edge-node. Also set PULSE_SERVER=unix:/run/pulse/native in edge-node so libpulse (and ffmpeg's pulse input) finds the right socket.

discord_radio.py: Removed _icecast_url and changed _play_stream() to use -f pulse -i default.monitor. This reads directly from the PulseAudio sink monitor — zero buffer delay. The PULSE_SERVER env var is inherited by the ffmpeg subprocess.

Note: default.monitor captures whatever audio is playing on the default sink. If OP25 uses a named virtual sink, you may need to replace default.monitor with <sink_name>.monitor (run pactl list sinks short inside the op25 container to find the name).

Issue 2 — No audio URL / GCS credentials

storage.py: storage.Client() was using ADC but ADC isn't configured in the container. Now uses storage.Client.from_service_account_json(settings.gcp_credentials_path) when GCP_CREDENTIALS_PATH is set — same credential file Firebase already loads.

You also need to mount the key file into the server container in docker-compose.yml:

c2-core:
  volumes:
    - ./gcp-key.json:/app/gcp-key.json:ro
And set GCS_BUCKET=your-bucket-name in .env.

Issue 3 — Token orphaning

mqtt_manager.py: Every checkin now includes "discord_connected": radio_bot.is_connected.

mqtt_handler.py: On checkin, if discord_connected is explicitly False, calls release_token(node_id). Only fires on explicit false (missing field = unknown = no action).

node_sweeper.py: When a node is swept to offline, its token is released too. This covers the case where the node stops checking in entirely (crash/power loss).
2026-04-11 20:29:33 -04:00

173 lines
6.8 KiB
Python

import asyncio
import json
from datetime import datetime, timezone
from typing import Optional, Callable, Awaitable, Dict, Any
import paho.mqtt.client as mqtt
from app.config import settings
from app.internal.logger import logger
from app.internal import credentials
CommandCallback = Callable[[Dict[str, Any]], Awaitable[None]]
ConfigCallback = Callable[[Dict[str, Any]], Awaitable[None]]
ApiKeyCallback = Callable[[Dict[str, Any]], Awaitable[None]]
class MQTTManager:
def __init__(self):
self._client: Optional[mqtt.Client] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._connected = False
self._connect_task: Optional[asyncio.Task] = None
self.on_command: Optional[CommandCallback] = None
self.on_config_push: Optional[ConfigCallback] = None
self.on_api_key: Optional[ApiKeyCallback] = None
nid = settings.node_id
self._t_checkin = f"nodes/{nid}/checkin"
self._t_status = f"nodes/{nid}/status"
self._t_metadata = f"nodes/{nid}/metadata"
self._t_commands = f"nodes/{nid}/commands"
self._t_config = f"nodes/{nid}/config"
self._t_api_key = f"nodes/{nid}/api_key"
self._t_key_request = f"nodes/{nid}/key_request"
self._t_discovery = "nodes/discovery/request"
def _build_client(self) -> mqtt.Client:
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id=settings.node_id,
)
if settings.mqtt_user:
client.username_pw_set(settings.mqtt_user, settings.mqtt_pass)
lwt = json.dumps({
"node_id": settings.node_id,
"status": "offline",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
client.will_set(self._t_status, lwt, qos=1, retain=True)
client.reconnect_delay_set(min_delay=2, max_delay=60)
client.on_connect = self._on_connect
client.on_disconnect = self._on_disconnect
client.on_message = self._on_message
return client
def _on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code == 0:
self._connected = True
client.subscribe(self._t_commands, qos=1)
client.subscribe(self._t_config, qos=1)
client.subscribe(self._t_api_key, qos=2)
client.subscribe(self._t_discovery, qos=0)
logger.info("MQTT connected.")
asyncio.run_coroutine_threadsafe(self._publish_checkin(), self._loop)
asyncio.run_coroutine_threadsafe(self._maybe_request_key(), self._loop)
else:
logger.error(f"MQTT connect refused: {reason_code}")
def _on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties):
self._connected = False
logger.warning(f"MQTT disconnected: {reason_code}")
def _on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
except Exception:
payload = msg.payload.decode()
if msg.topic == self._t_commands and self.on_command:
asyncio.run_coroutine_threadsafe(self.on_command(payload), self._loop)
elif msg.topic == self._t_config and self.on_config_push:
asyncio.run_coroutine_threadsafe(self.on_config_push(payload), self._loop)
elif msg.topic == self._t_api_key and self.on_api_key:
asyncio.run_coroutine_threadsafe(self.on_api_key(payload), self._loop)
elif msg.topic == self._t_discovery:
asyncio.run_coroutine_threadsafe(self._publish_checkin(), self._loop)
async def connect(self):
self._loop = asyncio.get_event_loop()
self._client = self._build_client()
self._connect_task = asyncio.create_task(self._connect_with_retry())
async def _connect_with_retry(self):
"""Attempt the initial TCP connect, retrying with backoff until it succeeds."""
delay = 5
logger.info(f"MQTT connecting to {settings.mqtt_broker}:{settings.mqtt_port}")
while True:
try:
self._client.connect(settings.mqtt_broker, settings.mqtt_port, keepalive=60)
self._client.loop_start()
# paho loop_start + reconnect_delay_set handles all subsequent reconnects
return
except Exception as e:
logger.warning(f"MQTT connect failed ({e}) — retrying in {delay}s")
await asyncio.sleep(delay)
delay = min(delay * 2, 60)
async def disconnect(self):
if self._connect_task:
self._connect_task.cancel()
if self._client:
await self.publish_status("offline")
self._client.loop_stop()
self._client.disconnect()
async def publish_status(self, status: str, extra: dict = None):
payload = {
"node_id": settings.node_id,
"status": status,
"timestamp": datetime.now(timezone.utc).isoformat(),
**(extra or {}),
}
self._publish(self._t_status, payload, qos=1, retain=True)
async def publish_metadata(self, event_type: str, data: dict):
payload = {
"event": event_type,
"node_id": settings.node_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
**data,
}
self._publish(self._t_metadata, payload, qos=1)
async def _maybe_request_key(self):
"""After connecting, wait for any retained api_key message to arrive.
If no key materialises within 5 seconds, ask the server to re-deliver it."""
await asyncio.sleep(5)
if not credentials.get_api_key():
logger.info("No API key on disk — requesting re-delivery from C2 server.")
self._publish(self._t_key_request, {}, qos=1)
async def _publish_checkin(self):
from app.internal.discord_radio import radio_bot
payload = {
"node_id": settings.node_id,
"name": settings.node_name,
"lat": settings.node_lat,
"lon": settings.node_lon,
"discord_connected": radio_bot.is_connected,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
self._publish(self._t_checkin, payload, qos=1)
def _publish(self, topic: str, payload: dict, qos: int = 0, retain: bool = False):
if self._client and self._connected:
self._client.publish(topic, json.dumps(payload), qos=qos, retain=retain)
else:
logger.debug(f"MQTT not connected, dropping publish to {topic}")
async def heartbeat_loop(self):
while True:
if self._connected:
await self._publish_checkin()
await asyncio.sleep(30)
@property
def is_connected(self) -> bool:
return self._connected
mqtt_manager = MQTTManager()