diff --git a/app/c2_main.py b/app/c2_main.py index e8de0b0..5aad10e 100644 --- a/app/c2_main.py +++ b/app/c2_main.py @@ -118,12 +118,117 @@ mqtt_client = mqtt.Client(client_id=C2_ID) mqtt_client.on_connect = on_connect mqtt_client.on_message = on_message +async def initialize_node_states(): + """ + On startup: + 1. Mark all known nodes as 'unknown' until they check in. + 2. Publish a discovery request to trigger immediate check-ins. + """ + print("Initializing node states...") + try: + nodes_ref = db.collection("nodes") + + # Fetch all nodes (blocking call wrapped) + def get_all_nodes(): + return list(nodes_ref.stream()) + + docs = await async_firestore(get_all_nodes) + + batch = db.batch() + count = 0 + + for doc in docs: + doc_ref = nodes_ref.document(doc.id) + batch.update(doc_ref, {"status": "unknown"}) + count += 1 + + # Update local cache if present + if doc.id in ACTIVE_NODES_CACHE: + ACTIVE_NODES_CACHE[doc.id]["status"] = "unknown" + + if count > 0: + await async_firestore(batch.commit) + print(f"Reset {count} nodes to 'unknown' status.") + + # Publish discovery request + print("Publishing discovery request...") + mqtt_client.publish("nodes/discovery/request", json.dumps({"ts": datetime.utcnow().isoformat()}), qos=1) + + except Exception as e: + print(f"Error initializing nodes: {e}") + traceback.print_exc() + +async def node_sweeper(): + """ + Background task to check for stale nodes. + Runs every 60 seconds. + Marks nodes as 'offline' if last_seen > 90 seconds ago. + """ + print("Starting Node Sweeper...") + while True: + await asyncio.sleep(60) + try: + nodes_ref = db.collection("nodes") + + def get_all_nodes(): + return list(nodes_ref.stream()) + + docs = await async_firestore(get_all_nodes) + + batch = db.batch() + updates_count = 0 + now = datetime.utcnow() + + for doc in docs: + data = doc.to_dict() + node_id = doc.id + status = data.get("status") + last_seen = data.get("last_seen") + + # Skip if already offline + if status == "offline": + continue + + is_stale = False + if last_seen: + # Handle timezone awareness (Firestore returns aware, utcnow is naive) + if last_seen.tzinfo: + last_seen = last_seen.replace(tzinfo=None) + + delta = (now - last_seen).total_seconds() + if delta > 90: + is_stale = True + else: + # No timestamp? Treat as stale if not offline + is_stale = True + + if is_stale: + print(f"Node {node_id} is stale. Marking offline.") + doc_ref = nodes_ref.document(node_id) + batch.update(doc_ref, {"status": "offline", "radio_state": "unknown"}) + updates_count += 1 + + if node_id in ACTIVE_NODES_CACHE: + ACTIVE_NODES_CACHE[node_id]["status"] = "offline" + + if updates_count > 0: + await async_firestore(batch.commit) + print(f"Sweeper marked {updates_count} nodes as offline.") + + except Exception as e: + print(f"Error in node sweeper: {e}") + traceback.print_exc() + @app.on_event("startup") async def startup_event(): global MAIN_LOOP MAIN_LOOP = asyncio.get_running_loop() mqtt_client.connect_async(MQTT_BROKER, 1883, 60) mqtt_client.loop_start() + + # Start background tasks + asyncio.create_task(initialize_node_states()) + asyncio.create_task(node_sweeper()) @app.get("/nodes") async def get_nodes(): diff --git a/docker-compose.yml b/docker-compose.yml index 0e8031c..13a6d4f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,28 +9,11 @@ services: - FIRESTORE_DB_ID=${FIRESTORE_DB_ID:-c2-server} - MQTT_BROKER=mqtt-broker - PORT=8000 - depends_on: - - mqtt-broker ports: - "8000:8000" networks: - radio-shared-net - # The Post Office (MQTT Broker) - mqtt-broker: - image: eclipse-mosquitto:latest - container_name: radio-mqtt - restart: always - ports: - - "1883:1883" - - "9001:9001" - volumes: - - ./mosquitto/config/:/mosquitto/config/ - - ./mosquitto/data/:/mosquitto/data/ - - ./mosquitto/log/:/mosquitto/log/ - networks: - - radio-shared-net - networks: radio-shared-net: external: true \ No newline at end of file diff --git a/mqtt-compose.yml b/mqtt-compose.yml new file mode 100644 index 0000000..c3808ce --- /dev/null +++ b/mqtt-compose.yml @@ -0,0 +1,19 @@ +services: +# The Post Office (MQTT Broker) + mqtt-broker: + image: eclipse-mosquitto:latest + container_name: radio-mqtt + restart: always + ports: + - "1883:1883" + - "9001:9001" + volumes: + - ./mosquitto/config/:/mosquitto/config/ + - ./mosquitto/data/:/mosquitto/data/ + - ./mosquitto/log/:/mosquitto/log/ + networks: + - radio-shared-net + +networks: + radio-shared-net: + external: true \ No newline at end of file