Move MQTT out of the main compose for testing server and client disconnects
This commit is contained in:
105
app/c2_main.py
105
app/c2_main.py
@@ -118,12 +118,117 @@ mqtt_client = mqtt.Client(client_id=C2_ID)
|
||||
mqtt_client.on_connect = on_connect
|
||||
mqtt_client.on_message = on_message
|
||||
|
||||
async def initialize_node_states():
|
||||
"""
|
||||
On startup:
|
||||
1. Mark all known nodes as 'unknown' until they check in.
|
||||
2. Publish a discovery request to trigger immediate check-ins.
|
||||
"""
|
||||
print("Initializing node states...")
|
||||
try:
|
||||
nodes_ref = db.collection("nodes")
|
||||
|
||||
# Fetch all nodes (blocking call wrapped)
|
||||
def get_all_nodes():
|
||||
return list(nodes_ref.stream())
|
||||
|
||||
docs = await async_firestore(get_all_nodes)
|
||||
|
||||
batch = db.batch()
|
||||
count = 0
|
||||
|
||||
for doc in docs:
|
||||
doc_ref = nodes_ref.document(doc.id)
|
||||
batch.update(doc_ref, {"status": "unknown"})
|
||||
count += 1
|
||||
|
||||
# Update local cache if present
|
||||
if doc.id in ACTIVE_NODES_CACHE:
|
||||
ACTIVE_NODES_CACHE[doc.id]["status"] = "unknown"
|
||||
|
||||
if count > 0:
|
||||
await async_firestore(batch.commit)
|
||||
print(f"Reset {count} nodes to 'unknown' status.")
|
||||
|
||||
# Publish discovery request
|
||||
print("Publishing discovery request...")
|
||||
mqtt_client.publish("nodes/discovery/request", json.dumps({"ts": datetime.utcnow().isoformat()}), qos=1)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error initializing nodes: {e}")
|
||||
traceback.print_exc()
|
||||
|
||||
async def node_sweeper():
|
||||
"""
|
||||
Background task to check for stale nodes.
|
||||
Runs every 60 seconds.
|
||||
Marks nodes as 'offline' if last_seen > 90 seconds ago.
|
||||
"""
|
||||
print("Starting Node Sweeper...")
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
try:
|
||||
nodes_ref = db.collection("nodes")
|
||||
|
||||
def get_all_nodes():
|
||||
return list(nodes_ref.stream())
|
||||
|
||||
docs = await async_firestore(get_all_nodes)
|
||||
|
||||
batch = db.batch()
|
||||
updates_count = 0
|
||||
now = datetime.utcnow()
|
||||
|
||||
for doc in docs:
|
||||
data = doc.to_dict()
|
||||
node_id = doc.id
|
||||
status = data.get("status")
|
||||
last_seen = data.get("last_seen")
|
||||
|
||||
# Skip if already offline
|
||||
if status == "offline":
|
||||
continue
|
||||
|
||||
is_stale = False
|
||||
if last_seen:
|
||||
# Handle timezone awareness (Firestore returns aware, utcnow is naive)
|
||||
if last_seen.tzinfo:
|
||||
last_seen = last_seen.replace(tzinfo=None)
|
||||
|
||||
delta = (now - last_seen).total_seconds()
|
||||
if delta > 90:
|
||||
is_stale = True
|
||||
else:
|
||||
# No timestamp? Treat as stale if not offline
|
||||
is_stale = True
|
||||
|
||||
if is_stale:
|
||||
print(f"Node {node_id} is stale. Marking offline.")
|
||||
doc_ref = nodes_ref.document(node_id)
|
||||
batch.update(doc_ref, {"status": "offline", "radio_state": "unknown"})
|
||||
updates_count += 1
|
||||
|
||||
if node_id in ACTIVE_NODES_CACHE:
|
||||
ACTIVE_NODES_CACHE[node_id]["status"] = "offline"
|
||||
|
||||
if updates_count > 0:
|
||||
await async_firestore(batch.commit)
|
||||
print(f"Sweeper marked {updates_count} nodes as offline.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in node sweeper: {e}")
|
||||
traceback.print_exc()
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
global MAIN_LOOP
|
||||
MAIN_LOOP = asyncio.get_running_loop()
|
||||
mqtt_client.connect_async(MQTT_BROKER, 1883, 60)
|
||||
mqtt_client.loop_start()
|
||||
|
||||
# Start background tasks
|
||||
asyncio.create_task(initialize_node_states())
|
||||
asyncio.create_task(node_sweeper())
|
||||
|
||||
@app.get("/nodes")
|
||||
async def get_nodes():
|
||||
|
||||
Reference in New Issue
Block a user