diff --git a/.gitignore b/.gitignore index 525e20b..3d6cc1c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .env *.log *.db -*.conf \ No newline at end of file +*.conf +*.json \ No newline at end of file diff --git a/app/c2_main.py b/app/c2_main.py index 5ad96f6..7aa6331 100644 --- a/app/c2_main.py +++ b/app/c2_main.py @@ -1,10 +1,13 @@ import json import os import asyncio +from functools import partial +import traceback from fastapi import FastAPI, HTTPException import paho.mqtt.client as mqtt from datetime import datetime -from motor.motor_asyncio import AsyncIOMotorClient +import firebase_admin +from firebase_admin import credentials, firestore from pydantic import BaseModel from typing import Any, Dict @@ -12,13 +15,21 @@ app = FastAPI(title="Radio C2 Brain") # Configuration MQTT_BROKER = os.getenv("MQTT_BROKER", "mqtt-broker") -MONGO_URI = os.getenv("MONGO_URI", "mongodb://admin:securepassword@db:27017/radio_c2?authSource=admin") +FIREBASE_CRED_JSON = os.getenv("FIREBASE_CRED_JSON") +FIRESTORE_DB_ID = os.getenv("FIRESTORE_DB_ID", "c2-server") C2_ID = "central-brain-01" # Database Init -mongo_client = AsyncIOMotorClient(MONGO_URI) -db = mongo_client.get_database() -nodes_col = db.get_collection("nodes") +if FIREBASE_CRED_JSON: + print("Initializing Firebase with provided JSON credentials...") + cred = credentials.Certificate(json.loads(FIREBASE_CRED_JSON)) + firebase_admin.initialize_app(cred) +else: + print("Initializing Firebase with Application Default Credentials...") + firebase_admin.initialize_app() + +print(f"Connecting to Firestore Database: {FIRESTORE_DB_ID}") +db = firestore.client(database_id=FIRESTORE_DB_ID) # Local cache for quick lookups ACTIVE_NODES_CACHE = {} @@ -29,6 +40,11 @@ class NodeCommand(BaseModel): command: str payload: Dict[str, Any] +# Helper for async execution of blocking firestore calls +async def async_firestore(func, *args, **kwargs): + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, partial(func, *args, **kwargs)) + def on_connect(client, userdata, flags, rc): print(f"Brain connected to MQTT Broker with result code {rc}") client.subscribe("nodes/+/checkin") @@ -50,6 +66,7 @@ async def handle_message(msg): timestamp = datetime.utcnow() if event_type == "checkin": + print(f"Processing checkin for {node_id}...") data = { "node_id": node_id, "last_seen": timestamp, @@ -59,20 +76,25 @@ async def handle_message(msg): "config": payload, "radio_state": "active" if payload.get("is_listening") else "idle" } - await nodes_col.update_one({"node_id": node_id}, {"$set": data}, upsert=True) + doc_ref = db.collection("nodes").document(node_id) + print(f"Writing to Firestore: {doc_ref.path} in DB {FIRESTORE_DB_ID}") + await async_firestore(doc_ref.set, data, merge=True) + print(f"Successfully updated checkin for {node_id}") ACTIVE_NODES_CACHE[node_id] = data elif event_type == "status": + print(f"Processing status update for {node_id}...") status = payload.get("status") - await nodes_col.update_one( - {"node_id": node_id}, - {"$set": {"status": status, "last_seen": timestamp}} - ) + doc_ref = db.collection("nodes").document(node_id) + print(f"Writing to Firestore: {doc_ref.path} in DB {FIRESTORE_DB_ID}") + await async_firestore(doc_ref.set, {"status": status, "last_seen": timestamp}, merge=True) + print(f"Successfully updated status for {node_id}") if node_id in ACTIVE_NODES_CACHE: ACTIVE_NODES_CACHE[node_id]["status"] = status except Exception as e: - print(f"Error processing MQTT: {e}") + print(f"Error processing MQTT message from {node_id}: {e}") + traceback.print_exc() # MQTT Setup mqtt_client = mqtt.Client(client_id=C2_ID) @@ -88,9 +110,13 @@ async def startup_event(): @app.get("/nodes") async def get_nodes(): - nodes = await nodes_col.find().to_list(length=100) - # Convert ObjectId to string for JSON serialization - for n in nodes: n["_id"] = str(n["_id"]) + def get_docs(): + return [ + {**doc.to_dict(), "_id": doc.id} + for doc in db.collection("nodes").stream() + ] + + nodes = await async_firestore(get_docs) return nodes @app.post("/nodes/{node_id}/command") diff --git a/docker-compose.yml b/docker-compose.yml index 671bd51..0e8031c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,30 +5,17 @@ services: container_name: radio-c2-brain restart: always environment: - - MONGO_URI=mongodb://${MONGO_INITDB_ROOT_USERNAME}:${MONGO_INITDB_ROOT_PASSWORD}@db:27017/radio_c2?authSource=admin + - FIREBASE_CRED_JSON=${FIREBASE_CRED_JSON:-} + - FIRESTORE_DB_ID=${FIRESTORE_DB_ID:-c2-server} - MQTT_BROKER=mqtt-broker - PORT=8000 depends_on: - - db - mqtt-broker ports: - "8000:8000" networks: - radio-shared-net - # The Memory (MongoDB) - db: - image: mongo:latest - container_name: radio-db - restart: always - environment: - - MONGO_INITDB_ROOT_USERNAME=${MONGO_INITDB_ROOT_USERNAME} - - MONGO_INITDB_ROOT_PASSWORD=${MONGO_INITDB_ROOT_PASSWORD} - volumes: - - mongo_data:/data/db - networks: - - radio-shared-net - # The Post Office (MQTT Broker) mqtt-broker: image: eclipse-mosquitto:latest @@ -46,7 +33,4 @@ services: networks: radio-shared-net: - external: true - -volumes: - mongo_data: \ No newline at end of file + external: true \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f979c15..cbe924e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ fastapi uvicorn[standard] paho-mqtt -motor pydantic -pydantic-settings \ No newline at end of file +pydantic-settings +firebase-admin \ No newline at end of file