adding discovery request and moving HB to its own function
This commit is contained in:
@@ -129,6 +129,7 @@ async def mqtt_lifecycle_manager():
|
|||||||
if rc == 0:
|
if rc == 0:
|
||||||
LOGGER.info(f"Connected to MQTT Broker: {MQTT_BROKER}")
|
LOGGER.info(f"Connected to MQTT Broker: {MQTT_BROKER}")
|
||||||
client.subscribe(f"nodes/{NODE_ID}/commands")
|
client.subscribe(f"nodes/{NODE_ID}/commands")
|
||||||
|
client.subscribe("nodes/discovery/request")
|
||||||
MQTT_CONNECTED = True
|
MQTT_CONNECTED = True
|
||||||
else:
|
else:
|
||||||
LOGGER.error(f"MQTT Connection failed: {rc}")
|
LOGGER.error(f"MQTT Connection failed: {rc}")
|
||||||
@@ -139,8 +140,33 @@ async def mqtt_lifecycle_manager():
|
|||||||
if rc != 0:
|
if rc != 0:
|
||||||
LOGGER.warning("Unexpected MQTT disconnection.")
|
LOGGER.warning("Unexpected MQTT disconnection.")
|
||||||
|
|
||||||
|
def publish_heartbeat():
|
||||||
|
"""Helper to gather status and publish check-in."""
|
||||||
|
try:
|
||||||
|
status_response = requests.get("http://localhost:8001/op25/status", timeout=2)
|
||||||
|
op25_status = status_response.json() if status_response.ok else {}
|
||||||
|
except Exception:
|
||||||
|
op25_status = {"is_running": False}
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"node_id": NODE_ID,
|
||||||
|
"status": "online",
|
||||||
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
"is_listening": op25_status.get("is_running", False),
|
||||||
|
"active_system": op25_status.get("active_system"),
|
||||||
|
# Only scan library if needed, otherwise it's heavy I/O
|
||||||
|
"available_systems": scan_local_library()
|
||||||
|
}
|
||||||
|
|
||||||
|
client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(payload), retain=True)
|
||||||
|
LOGGER.debug("Sent Heartbeat/Check-in")
|
||||||
|
|
||||||
def on_message(client, userdata, msg):
|
def on_message(client, userdata, msg):
|
||||||
handle_c2_command(msg.topic, msg.payload.decode())
|
if msg.topic == "nodes/discovery/request":
|
||||||
|
LOGGER.info("Received Discovery Request. Sending Heartbeat...")
|
||||||
|
publish_heartbeat()
|
||||||
|
else:
|
||||||
|
handle_c2_command(msg.topic, msg.payload.decode())
|
||||||
|
|
||||||
# --- Setup ---
|
# --- Setup ---
|
||||||
client.on_connect = on_connect
|
client.on_connect = on_connect
|
||||||
@@ -158,29 +184,10 @@ async def mqtt_lifecycle_manager():
|
|||||||
# --- Main Heartbeat Loop ---
|
# --- Main Heartbeat Loop ---
|
||||||
while True:
|
while True:
|
||||||
if MQTT_CONNECTED:
|
if MQTT_CONNECTED:
|
||||||
# 1. Gather Data (This was previously in on_connect)
|
publish_heartbeat()
|
||||||
try:
|
# Pulse every 30 seconds
|
||||||
status_response = requests.get("http://localhost:8001/op25/status", timeout=2)
|
# Only wait 30 sec if the HB sent. This way we don't stall a check-in
|
||||||
op25_status = status_response.json() if status_response.ok else {}
|
await asyncio.sleep(30)
|
||||||
except Exception:
|
|
||||||
op25_status = {"is_running": False}
|
|
||||||
|
|
||||||
payload = {
|
|
||||||
"node_id": NODE_ID,
|
|
||||||
"status": "online",
|
|
||||||
"timestamp": datetime.now().isoformat(),
|
|
||||||
"is_listening": op25_status.get("is_running", False),
|
|
||||||
"active_system": op25_status.get("active_system"),
|
|
||||||
# Only scan library if needed, otherwise it's heavy I/O
|
|
||||||
"available_systems": scan_local_library()
|
|
||||||
}
|
|
||||||
|
|
||||||
# 2. Publish Heartbeat (Acts as both check-in and keep-alive)
|
|
||||||
client.publish(f"nodes/{NODE_ID}/checkin", json.dumps(payload), retain=True)
|
|
||||||
LOGGER.debug("Sent Heartbeat/Check-in")
|
|
||||||
|
|
||||||
# Pulse every 30 seconds
|
|
||||||
await asyncio.sleep(30)
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
LOGGER.info("Stopping MQTT Loop...")
|
LOGGER.info("Stopping MQTT Loop...")
|
||||||
|
|||||||
Reference in New Issue
Block a user