2f0597c81b
Includes c2-core (FastAPI/MQTT/Firestore), discord-bot (slash commands), frontend (Next.js admin UI), and mosquitto config.
56 lines
1.7 KiB
Python
56 lines
1.7 KiB
Python
import asyncio
|
|
from datetime import datetime, timezone, timedelta
|
|
from app.config import settings
|
|
from app.internal.logger import logger
|
|
from app.internal import firestore as fstore
|
|
|
|
SWEEP_INTERVAL = 30 # seconds
|
|
|
|
|
|
async def sweeper_loop():
|
|
"""
|
|
Periodically check for nodes that haven't checked in recently
|
|
and mark them offline in Firestore.
|
|
"""
|
|
logger.info("Node sweeper started.")
|
|
while True:
|
|
await asyncio.sleep(SWEEP_INTERVAL)
|
|
try:
|
|
await _sweep()
|
|
except Exception as e:
|
|
logger.error(f"Sweeper error: {e}")
|
|
|
|
|
|
async def _sweep():
|
|
threshold = datetime.now(timezone.utc) - timedelta(seconds=settings.node_offline_threshold)
|
|
|
|
def _query():
|
|
from app.internal.firestore import db
|
|
return [
|
|
doc.to_dict()
|
|
for doc in db.collection("nodes").stream()
|
|
]
|
|
|
|
nodes = await asyncio.to_thread(_query)
|
|
for node in nodes:
|
|
status = node.get("status", "offline")
|
|
if status == "offline":
|
|
continue
|
|
|
|
last_seen_raw = node.get("last_seen")
|
|
if not last_seen_raw:
|
|
continue
|
|
|
|
# last_seen may be a Firestore Timestamp, a datetime, or an ISO string
|
|
if isinstance(last_seen_raw, str):
|
|
last_seen = datetime.fromisoformat(last_seen_raw)
|
|
else:
|
|
last_seen = last_seen_raw
|
|
if last_seen.tzinfo is None:
|
|
last_seen = last_seen.replace(tzinfo=timezone.utc)
|
|
|
|
if last_seen < threshold:
|
|
node_id = node.get("node_id")
|
|
await fstore.doc_update("nodes", node_id, {"status": "offline"})
|
|
logger.info(f"Node {node_id} marked offline (last seen: {last_seen.isoformat()})")
|