Files
server-26/drb-c2-core/app/internal/node_sweeper.py
T
Logan 2a690ec696 Issue 1 — Discord Audio (PulseAudio)
docker-compose.yml: Added a pulse_socket named volume mounted at /run/pulse in both op25 and edge-node. Also set PULSE_SERVER=unix:/run/pulse/native in edge-node so libpulse (and ffmpeg's pulse input) finds the right socket.

discord_radio.py: Removed _icecast_url and changed _play_stream() to use -f pulse -i default.monitor. This reads directly from the PulseAudio sink monitor — zero buffer delay. The PULSE_SERVER env var is inherited by the ffmpeg subprocess.

Note: default.monitor captures whatever audio is playing on the default sink. If OP25 uses a named virtual sink, you may need to replace default.monitor with <sink_name>.monitor (run pactl list sinks short inside the op25 container to find the name).

Issue 2 — No audio URL / GCS credentials

storage.py: storage.Client() was using ADC but ADC isn't configured in the container. Now uses storage.Client.from_service_account_json(settings.gcp_credentials_path) when GCP_CREDENTIALS_PATH is set — same credential file Firebase already loads.

You also need to mount the key file into the server container in docker-compose.yml:

c2-core:
  volumes:
    - ./gcp-key.json:/app/gcp-key.json:ro
And set GCS_BUCKET=your-bucket-name in .env.

Issue 3 — Token orphaning

mqtt_manager.py: Every checkin now includes "discord_connected": radio_bot.is_connected.

mqtt_handler.py: On checkin, if discord_connected is explicitly False, calls release_token(node_id). Only fires on explicit false (missing field = unknown = no action).

node_sweeper.py: When a node is swept to offline, its token is released too. This covers the case where the node stops checking in entirely (crash/power loss).
2026-04-11 20:31:07 -04:00

58 lines
1.8 KiB
Python

import asyncio
from datetime import datetime, timezone, timedelta
from app.config import settings
from app.internal.logger import logger
from app.internal import firestore as fstore
SWEEP_INTERVAL = 30 # seconds
async def sweeper_loop():
"""
Periodically check for nodes that haven't checked in recently
and mark them offline in Firestore.
"""
logger.info("Node sweeper started.")
while True:
await asyncio.sleep(SWEEP_INTERVAL)
try:
await _sweep()
except Exception as e:
logger.error(f"Sweeper error: {e}")
async def _sweep():
threshold = datetime.now(timezone.utc) - timedelta(seconds=settings.node_offline_threshold)
def _query():
from app.internal.firestore import db
return [
doc.to_dict()
for doc in db.collection("nodes").stream()
]
nodes = await asyncio.to_thread(_query)
for node in nodes:
status = node.get("status", "offline")
if status == "offline":
continue
last_seen_raw = node.get("last_seen")
if not last_seen_raw:
continue
# last_seen may be a Firestore Timestamp, a datetime, or an ISO string
if isinstance(last_seen_raw, str):
last_seen = datetime.fromisoformat(last_seen_raw)
else:
last_seen = last_seen_raw
if last_seen.tzinfo is None:
last_seen = last_seen.replace(tzinfo=timezone.utc)
if last_seen < threshold:
node_id = node.get("node_id")
await fstore.doc_update("nodes", node_id, {"status": "offline"})
logger.info(f"Node {node_id} marked offline (last seen: {last_seen.isoformat()})")
from app.routers.tokens import release_token
await release_token(node_id)