import json import os import asyncio from fastapi import FastAPI, HTTPException import paho.mqtt.client as mqtt from datetime import datetime from motor.motor_asyncio import AsyncIOMotorClient from pydantic import BaseModel from typing import Any, Dict app = FastAPI(title="Radio C2 Brain") # Configuration MQTT_BROKER = os.getenv("MQTT_BROKER", "mqtt-broker") MONGO_URI = os.getenv("MONGO_URI", "mongodb://admin:securepassword@db:27017/radio_c2?authSource=admin") C2_ID = "central-brain-01" # Database Init mongo_client = AsyncIOMotorClient(MONGO_URI) db = mongo_client.get_database() nodes_col = db.get_collection("nodes") # Local cache for quick lookups ACTIVE_NODES_CACHE = {} MAIN_LOOP = None # Pydantic Models class NodeCommand(BaseModel): command: str payload: Dict[str, Any] def on_connect(client, userdata, flags, rc): print(f"Brain connected to MQTT Broker with result code {rc}") client.subscribe("nodes/+/checkin") client.subscribe("nodes/+/status") def on_message(client, userdata, msg): if MAIN_LOOP: asyncio.run_coroutine_threadsafe(handle_message(msg), MAIN_LOOP) async def handle_message(msg): topic_parts = msg.topic.split('/') if len(topic_parts) < 3: return node_id = topic_parts[1] event_type = topic_parts[2] try: payload = json.loads(msg.payload.decode()) timestamp = datetime.utcnow() if event_type == "checkin": data = { "node_id": node_id, "last_seen": timestamp, "status": payload.get("status", "online"), "active_system": payload.get("active_system"), "available_systems": payload.get("available_systems", []), "config": payload, "radio_state": "active" if payload.get("is_listening") else "idle" } await nodes_col.update_one({"node_id": node_id}, {"$set": data}, upsert=True) ACTIVE_NODES_CACHE[node_id] = data elif event_type == "status": status = payload.get("status") await nodes_col.update_one( {"node_id": node_id}, {"$set": {"status": status, "last_seen": timestamp}} ) if node_id in ACTIVE_NODES_CACHE: ACTIVE_NODES_CACHE[node_id]["status"] = status except Exception as e: print(f"Error processing MQTT: {e}") # MQTT Setup mqtt_client = mqtt.Client(client_id=C2_ID) mqtt_client.on_connect = on_connect mqtt_client.on_message = on_message @app.on_event("startup") async def startup_event(): global MAIN_LOOP MAIN_LOOP = asyncio.get_running_loop() mqtt_client.connect_async(MQTT_BROKER, 1883, 60) mqtt_client.loop_start() @app.get("/nodes") async def get_nodes(): nodes = await nodes_col.find().to_list(length=100) # Convert ObjectId to string for JSON serialization for n in nodes: n["_id"] = str(n["_id"]) return nodes @app.post("/nodes/{node_id}/command") async def send_command_to_node(node_id: str, command: NodeCommand): if node_id not in ACTIVE_NODES_CACHE: raise HTTPException(status_code=404, detail="Node not found or is offline") topic = f"nodes/{node_id}/commands" message_payload = { "command": command.command, **command.payload } mqtt_client.publish(topic, json.dumps(message_payload), qos=1) return {"status": "command_sent", "node_id": node_id, "command": command.command}