diff --git a/drb-c2-core/app/internal/firestore.py b/drb-c2-core/app/internal/firestore.py index 302811f..809bccb 100644 --- a/drb-c2-core/app/internal/firestore.py +++ b/drb-c2-core/app/internal/firestore.py @@ -1,4 +1,5 @@ import asyncio +import time as _time from typing import Optional, Any import firebase_admin from firebase_admin import credentials, firestore as fs @@ -6,6 +7,12 @@ from google.cloud.firestore_v1.base_query import FieldFilter from app.config import settings from app.internal.logger import logger +# --------------------------------------------------------------------------- +# In-memory TTL cache for rarely-changing documents (systems, nodes config) +# --------------------------------------------------------------------------- +# Key: "collection/doc_id" → (expires_at_monotonic, data_or_None) +_doc_cache: dict[str, tuple[float, Optional[dict]]] = {} + def _init_firebase(): if firebase_admin._apps: @@ -79,3 +86,19 @@ async def collection_where( async def doc_delete(collection: str, doc_id: str) -> None: ref = db.collection(collection).document(doc_id) await asyncio.to_thread(ref.delete) + + +async def doc_get_cached(collection: str, doc_id: str, ttl: float = 300.0) -> Optional[dict]: + """ + Like doc_get but backed by a short-lived in-memory TTL cache. + Use for documents that change rarely (systems config, node assignments). + Default TTL is 5 minutes — a write will be visible within that window. + """ + key = f"{collection}/{doc_id}" + now = _time.monotonic() + entry = _doc_cache.get(key) + if entry and now < entry[0]: + return entry[1] + data = await doc_get(collection, doc_id) + _doc_cache[key] = (now + ttl, data) + return data diff --git a/drb-c2-core/app/internal/intelligence.py b/drb-c2-core/app/internal/intelligence.py index 6395ef8..6ab2ad1 100644 --- a/drb-c2-core/app/internal/intelligence.py +++ b/drb-c2-core/app/internal/intelligence.py @@ -99,12 +99,11 @@ async def extract_scenes( vocabulary: list[str] = [] ten_codes: dict[str, str] = {} if system_id: - from app.internal.vocabulary_learner import get_vocabulary - vocab_data = await get_vocabulary(system_id) - vocabulary = vocab_data.get("vocabulary") or [] - system_doc = await fstore.doc_get("systems", system_id) + # Single cached read — vocabulary and ten_codes live on the same document. + system_doc = await fstore.doc_get_cached("systems", system_id) if system_doc: - ten_codes = system_doc.get("ten_codes") or {} + vocabulary = system_doc.get("vocabulary") or [] + ten_codes = system_doc.get("ten_codes") or {} raw_scenes: list[dict] = await asyncio.to_thread( _sync_extract, @@ -118,7 +117,7 @@ async def extract_scenes( node_lat: Optional[float] = None node_lon: Optional[float] = None if node_id: - node_doc = await fstore.doc_get("nodes", node_id) + node_doc = await fstore.doc_get_cached("nodes", node_id) if node_doc: node_lat = node_doc.get("lat") node_lon = node_doc.get("lon") diff --git a/drb-c2-core/app/internal/mqtt_handler.py b/drb-c2-core/app/internal/mqtt_handler.py index b50f2e0..89053b3 100644 --- a/drb-c2-core/app/internal/mqtt_handler.py +++ b/drb-c2-core/app/internal/mqtt_handler.py @@ -143,8 +143,8 @@ class MQTTHandler: if not call_id: return - # Look up assigned system for this node - node = await fstore.doc_get("nodes", node_id) + # Look up assigned system for this node (cached — assignment rarely changes) + node = await fstore.doc_get_cached("nodes", node_id) system_id = node.get("assigned_system_id") if node else None started_at_raw = payload.get("started_at") @@ -157,7 +157,7 @@ class MQTTHandler: # Prefer the name from OP25 metadata; fall back to the system config tgid_name = payload.get("tgid_name") or "" if not tgid_name and system_id and payload.get("tgid"): - system_doc = await fstore.doc_get("systems", system_id) + system_doc = await fstore.doc_get_cached("systems", system_id) if system_doc: tgid_int = int(payload["tgid"]) for tg in system_doc.get("config", {}).get("talkgroups", []): diff --git a/drb-c2-core/app/internal/node_sweeper.py b/drb-c2-core/app/internal/node_sweeper.py index b52a937..145c54b 100644 --- a/drb-c2-core/app/internal/node_sweeper.py +++ b/drb-c2-core/app/internal/node_sweeper.py @@ -4,7 +4,7 @@ from app.config import settings from app.internal.logger import logger from app.internal import firestore as fstore -SWEEP_INTERVAL = 30 # seconds +SWEEP_INTERVAL = 90 # seconds — matches node_offline_threshold; no gain in checking faster async def sweeper_loop(): diff --git a/drb-c2-core/app/internal/vocabulary_learner.py b/drb-c2-core/app/internal/vocabulary_learner.py index aa7e3d1..960f270 100644 --- a/drb-c2-core/app/internal/vocabulary_learner.py +++ b/drb-c2-core/app/internal/vocabulary_learner.py @@ -196,8 +196,8 @@ async def remove_term(system_id: str, term: str) -> None: async def get_vocabulary(system_id: str) -> dict: - """Return vocabulary and pending terms for a system.""" - doc = await fstore.doc_get("systems", system_id) + """Return vocabulary and pending terms for a system (TTL-cached, 5 min).""" + doc = await fstore.doc_get_cached("systems", system_id) if not doc: return {"vocabulary": [], "vocabulary_pending": [], "vocabulary_bootstrapped": False} return { @@ -281,8 +281,14 @@ async def _induct_system(system_id: str, system_doc: dict) -> None: system_name = system_doc.get("name", "Unknown") existing_vocab: list[str] = system_doc.get("vocabulary") or [] - # Fetch recent ended calls for this system - all_calls = await fstore.collection_list("calls", system_id=system_id, status="ended") + # Fetch calls from the last 7 days only — avoids scanning the entire history. + # Active calls have ended_at=None and are excluded by the range filter automatically. + # Needs a composite index on (system_id ASC, ended_at ASC). + cutoff = datetime.now(timezone.utc) - timedelta(days=7) + all_calls = await fstore.collection_where("calls", [ + ("system_id", "==", system_id), + ("ended_at", ">=", cutoff), + ]) if not all_calls: return diff --git a/drb-c2-core/app/routers/upload.py b/drb-c2-core/app/routers/upload.py index 566aa22..0b13f34 100644 --- a/drb-c2-core/app/routers/upload.py +++ b/drb-c2-core/app/routers/upload.py @@ -170,7 +170,7 @@ async def _run_intelligence_pipeline( # but global flag=False beats everything (master switch). system_ai_flags: dict = {} if system_id: - sys_doc = await fstore.doc_get("systems", system_id) + sys_doc = await fstore.doc_get_cached("systems", system_id) system_ai_flags = (sys_doc or {}).get("ai_flags") or {} def _flag(name: str) -> bool: