rename main to node_main for IDE clarity
This commit is contained in:
@@ -49,4 +49,4 @@ RUN sed -i 's/\r$//' /usr/local/bin/docker-entrypoint.sh && \
|
|||||||
ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"]
|
ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"]
|
||||||
|
|
||||||
# 3. Use CMD to pass the uvicorn command as arguments to the ENTRYPOINT script
|
# 3. Use CMD to pass the uvicorn command as arguments to the ENTRYPOINT script
|
||||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--reload"]
|
CMD ["uvicorn", "node_main:app", "--host", "0.0.0.0", "--port", "8001", "--reload"]
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import signal
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from routers.op25_controller import create_op25_router
|
from routers.op25_controller import create_op25_router
|
||||||
@@ -12,14 +13,17 @@ import requests
|
|||||||
# Initialize logging
|
# Initialize logging
|
||||||
LOGGER = create_logger(__name__)
|
LOGGER = create_logger(__name__)
|
||||||
|
|
||||||
|
# FastAPI App
|
||||||
app = FastAPI(title="Radio Edge Supervisor")
|
app = FastAPI(title="Radio Edge Supervisor")
|
||||||
|
|
||||||
# Add the router
|
|
||||||
app.include_router(create_op25_router(), prefix="/op25")
|
app.include_router(create_op25_router(), prefix="/op25")
|
||||||
|
|
||||||
|
# Configuration
|
||||||
NODE_ID = os.getenv("NODE_ID", "standalone-node")
|
NODE_ID = os.getenv("NODE_ID", "standalone-node")
|
||||||
MQTT_BROKER = os.getenv("MQTT_BROKER", None)
|
MQTT_BROKER = os.getenv("MQTT_BROKER", None)
|
||||||
|
|
||||||
|
# Global flag to track MQTT connection state
|
||||||
|
MQTT_CONNECTED = False
|
||||||
|
|
||||||
def handle_c2_command(topic, payload):
|
def handle_c2_command(topic, payload):
|
||||||
"""
|
"""
|
||||||
Parses and routes commands received from the C2 server by calling the
|
Parses and routes commands received from the C2 server by calling the
|
||||||
@@ -106,79 +110,100 @@ def handle_c2_command(topic, payload):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOGGER.error(f"Error processing C2 command: {e}")
|
LOGGER.error(f"Error processing C2 command: {e}")
|
||||||
|
|
||||||
async def mqtt_phone_home():
|
async def mqtt_lifecycle_manager():
|
||||||
"""
|
"""
|
||||||
Maintains a persistent C2 connection using a single MQTT client.
|
Manages the application-level logic: Check-in, Heartbeats, and Shutdown.
|
||||||
Handles check-ins and command subscriptions via callbacks.
|
Decoupled from the Paho MQTT network loop.
|
||||||
"""
|
"""
|
||||||
|
global MQTT_CONNECTED
|
||||||
|
|
||||||
if not MQTT_BROKER:
|
if not MQTT_BROKER:
|
||||||
LOGGER.info("No MQTT_BROKER defined. Running in standalone mode.")
|
LOGGER.info("No MQTT_BROKER defined. Running in standalone mode.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Create a single client instance
|
|
||||||
client = mqtt.Client(client_id=NODE_ID)
|
client = mqtt.Client(client_id=NODE_ID)
|
||||||
|
|
||||||
|
# --- Callbacks ---
|
||||||
def on_connect(client, userdata, flags, rc):
|
def on_connect(client, userdata, flags, rc):
|
||||||
|
global MQTT_CONNECTED
|
||||||
if rc == 0:
|
if rc == 0:
|
||||||
LOGGER.info(f"Successfully connected to MQTT Broker: {MQTT_BROKER}")
|
LOGGER.info(f"Connected to MQTT Broker: {MQTT_BROKER}")
|
||||||
|
client.subscribe(f"nodes/{NODE_ID}/commands")
|
||||||
# 1. Subscribe to command topics for this specific node
|
MQTT_CONNECTED = True
|
||||||
command_topic = f"nodes/{NODE_ID}/commands"
|
|
||||||
client.subscribe(command_topic)
|
|
||||||
LOGGER.info(f"Subscribed to {command_topic}")
|
|
||||||
|
|
||||||
# 2. Perform Initial Check-In with OP25 status
|
|
||||||
try:
|
|
||||||
status_response = requests.get("http://localhost:8001/op25/status")
|
|
||||||
op25_status = status_response.json() if status_response.ok else {}
|
|
||||||
except requests.RequestException:
|
|
||||||
op25_status = {"is_running": False, "active_system": None}
|
|
||||||
|
|
||||||
checkin_data = {
|
|
||||||
"node_id": NODE_ID,
|
|
||||||
"status": "online",
|
|
||||||
"timestamp": datetime.now().isoformat(),
|
|
||||||
"version": "1.0.0",
|
|
||||||
"is_listening": op25_status.get("is_running", False),
|
|
||||||
"active_system": op25_status.get("active_system"),
|
|
||||||
"available_systems": scan_local_library()
|
|
||||||
}
|
|
||||||
client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(checkin_data), retain=True)
|
|
||||||
else:
|
else:
|
||||||
LOGGER.error(f"MQTT Connection failed with return code {rc}")
|
LOGGER.error(f"MQTT Connection failed: {rc}")
|
||||||
|
|
||||||
def on_message(client, userdata, msg):
|
|
||||||
# Handle messages arriving on subscribed topics
|
|
||||||
handle_c2_command(msg.topic, msg.payload.decode())
|
|
||||||
|
|
||||||
def on_disconnect(client, userdata, rc):
|
def on_disconnect(client, userdata, rc):
|
||||||
|
global MQTT_CONNECTED
|
||||||
|
MQTT_CONNECTED = False
|
||||||
if rc != 0:
|
if rc != 0:
|
||||||
LOGGER.warning("Unexpected MQTT disconnection. Paho will attempt to reconnect...")
|
LOGGER.warning("Unexpected MQTT disconnection.")
|
||||||
|
|
||||||
# Set up callbacks and LWT
|
def on_message(client, userdata, msg):
|
||||||
|
handle_c2_command(msg.topic, msg.payload.decode())
|
||||||
|
|
||||||
|
# --- Setup ---
|
||||||
client.on_connect = on_connect
|
client.on_connect = on_connect
|
||||||
client.on_message = on_message
|
client.on_message = on_message
|
||||||
client.on_disconnect = on_disconnect
|
client.on_disconnect = on_disconnect
|
||||||
|
|
||||||
lwt_payload = json.dumps({"node_id": NODE_ID, "status": "offline"})
|
# LWT: Fires ONLY on ungraceful crash/timeout
|
||||||
|
lwt_payload = json.dumps({"status": "offline", "reason": "unexpected_disconnect"})
|
||||||
client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True)
|
client.will_set(f"nodes/{NODE_ID}/status", lwt_payload, qos=1, retain=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Connect and start the background loop thread
|
|
||||||
# loop_start() handles reconnections automatically without spaming new clients
|
|
||||||
client.connect(MQTT_BROKER, 1883, 60)
|
client.connect(MQTT_BROKER, 1883, 60)
|
||||||
client.loop_start()
|
client.loop_start() # Run network loop in background thread
|
||||||
|
|
||||||
# Keep the async task alive indefinitely
|
# --- Main Heartbeat Loop ---
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(3600)
|
if MQTT_CONNECTED:
|
||||||
|
# 1. Gather Data (This was previously in on_connect)
|
||||||
|
try:
|
||||||
|
status_response = requests.get("http://localhost:8001/op25/status", timeout=2)
|
||||||
|
op25_status = status_response.json() if status_response.ok else {}
|
||||||
|
except Exception:
|
||||||
|
op25_status = {"is_running": False}
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"node_id": NODE_ID,
|
||||||
|
"status": "online",
|
||||||
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
"is_listening": op25_status.get("is_running", False),
|
||||||
|
"active_system": op25_status.get("active_system"),
|
||||||
|
# Only scan library if needed, otherwise it's heavy I/O
|
||||||
|
"available_systems": scan_local_library()
|
||||||
|
}
|
||||||
|
|
||||||
|
# 2. Publish Heartbeat (Acts as both check-in and keep-alive)
|
||||||
|
client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(payload), retain=True)
|
||||||
|
LOGGER.debug("Sent Heartbeat/Check-in")
|
||||||
|
|
||||||
except Exception as e:
|
# Pulse every 30 seconds
|
||||||
LOGGER.error(f"Fatal error in MQTT supervisor: {e}")
|
await asyncio.sleep(30)
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
LOGGER.info("Stopping MQTT Loop...")
|
||||||
finally:
|
finally:
|
||||||
|
# Graceful Shutdown: Explicitly tell C2 we are leaving
|
||||||
|
if MQTT_CONNECTED:
|
||||||
|
shutdown_payload = json.dumps({"status": "offline", "reason": "clean_shutdown"})
|
||||||
|
client.publish(f"nodes/{NODE_ID}/status", shutdown_payload, qos=1, retain=True)
|
||||||
|
|
||||||
client.loop_stop()
|
client.loop_stop()
|
||||||
|
client.disconnect()
|
||||||
|
|
||||||
@app.on_event("startup")
|
@app.on_event("startup")
|
||||||
async def startup_event():
|
async def startup_event():
|
||||||
# Start the C2 connection in the background.
|
# Store the task so we can cancel it if needed (optional)
|
||||||
asyncio.create_task(mqtt_phone_home())
|
app.state.mqtt_task = asyncio.create_task(mqtt_lifecycle_manager())
|
||||||
|
|
||||||
|
@app.on_event("shutdown")
|
||||||
|
async def shutdown_event():
|
||||||
|
# Cancel the loop to trigger the finally block
|
||||||
|
if hasattr(app.state, "mqtt_task"):
|
||||||
|
app.state.mqtt_task.cancel()
|
||||||
|
try:
|
||||||
|
await app.state.mqtt_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
Reference in New Issue
Block a user