From 7e1b01a275d04d389140156901da73db877a011e Mon Sep 17 00:00:00 2001 From: Logan Date: Mon, 4 May 2026 02:05:00 -0400 Subject: [PATCH] Updates to reduce firestore calls to try and stay in free tier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Firestore read reductions **1. `doc_get_cached()` in `firestore.py` — new 5-min TTL cache** One place, benefits everything. System and node config documents almost never change during a monitoring session. **2. System doc: 4 reads → 1 per call** | Before | After | |---|---| | `upload.py` — `doc_get("systems")` for ai_flags | `doc_get_cached` | | `transcription.py` — `get_vocabulary()` → `doc_get("systems")` | cache hit | | `intelligence.py` — `get_vocabulary()` → `doc_get("systems")` | cache hit | | `intelligence.py` — `doc_get("systems")` again for ten_codes | eliminated (reads same cached doc) | **3. Node doc: cached in `_on_call_start` and `intelligence.py`** The node is read every call event to get `assigned_system_id` and lat/lon for geocoding. Both now use the cache — node assignments and positions essentially never change at runtime. **4. Node sweeper: 30s → 90s interval** The sweeper was doing a full node collection scan 3× more often than necessary — the offline threshold is already 90s. Cuts sweeper reads by 66%. **5. Vocabulary induction: scans all-time calls → last 7 days** Previously fetched every ended call for a system (could be thousands). Now scoped to the last 7 days. > **Note:** The vocabulary induction query `(system_id == X, ended_at >= cutoff)` needs a Firestore > composite index on `(system_id ASC, ended_at ASC)`. When the induction loop first fires it will log > an error with a Firebase Console link to create it in one click. --- drb-c2-core/app/internal/firestore.py | 23 +++++++++++++++++++ drb-c2-core/app/internal/intelligence.py | 11 ++++----- drb-c2-core/app/internal/mqtt_handler.py | 6 ++--- drb-c2-core/app/internal/node_sweeper.py | 2 +- .../app/internal/vocabulary_learner.py | 14 +++++++---- drb-c2-core/app/routers/upload.py | 2 +- 6 files changed, 43 insertions(+), 15 deletions(-) diff --git a/drb-c2-core/app/internal/firestore.py b/drb-c2-core/app/internal/firestore.py index 302811f..809bccb 100644 --- a/drb-c2-core/app/internal/firestore.py +++ b/drb-c2-core/app/internal/firestore.py @@ -1,4 +1,5 @@ import asyncio +import time as _time from typing import Optional, Any import firebase_admin from firebase_admin import credentials, firestore as fs @@ -6,6 +7,12 @@ from google.cloud.firestore_v1.base_query import FieldFilter from app.config import settings from app.internal.logger import logger +# --------------------------------------------------------------------------- +# In-memory TTL cache for rarely-changing documents (systems, nodes config) +# --------------------------------------------------------------------------- +# Key: "collection/doc_id" → (expires_at_monotonic, data_or_None) +_doc_cache: dict[str, tuple[float, Optional[dict]]] = {} + def _init_firebase(): if firebase_admin._apps: @@ -79,3 +86,19 @@ async def collection_where( async def doc_delete(collection: str, doc_id: str) -> None: ref = db.collection(collection).document(doc_id) await asyncio.to_thread(ref.delete) + + +async def doc_get_cached(collection: str, doc_id: str, ttl: float = 300.0) -> Optional[dict]: + """ + Like doc_get but backed by a short-lived in-memory TTL cache. + Use for documents that change rarely (systems config, node assignments). + Default TTL is 5 minutes — a write will be visible within that window. + """ + key = f"{collection}/{doc_id}" + now = _time.monotonic() + entry = _doc_cache.get(key) + if entry and now < entry[0]: + return entry[1] + data = await doc_get(collection, doc_id) + _doc_cache[key] = (now + ttl, data) + return data diff --git a/drb-c2-core/app/internal/intelligence.py b/drb-c2-core/app/internal/intelligence.py index 6395ef8..6ab2ad1 100644 --- a/drb-c2-core/app/internal/intelligence.py +++ b/drb-c2-core/app/internal/intelligence.py @@ -99,12 +99,11 @@ async def extract_scenes( vocabulary: list[str] = [] ten_codes: dict[str, str] = {} if system_id: - from app.internal.vocabulary_learner import get_vocabulary - vocab_data = await get_vocabulary(system_id) - vocabulary = vocab_data.get("vocabulary") or [] - system_doc = await fstore.doc_get("systems", system_id) + # Single cached read — vocabulary and ten_codes live on the same document. + system_doc = await fstore.doc_get_cached("systems", system_id) if system_doc: - ten_codes = system_doc.get("ten_codes") or {} + vocabulary = system_doc.get("vocabulary") or [] + ten_codes = system_doc.get("ten_codes") or {} raw_scenes: list[dict] = await asyncio.to_thread( _sync_extract, @@ -118,7 +117,7 @@ async def extract_scenes( node_lat: Optional[float] = None node_lon: Optional[float] = None if node_id: - node_doc = await fstore.doc_get("nodes", node_id) + node_doc = await fstore.doc_get_cached("nodes", node_id) if node_doc: node_lat = node_doc.get("lat") node_lon = node_doc.get("lon") diff --git a/drb-c2-core/app/internal/mqtt_handler.py b/drb-c2-core/app/internal/mqtt_handler.py index b50f2e0..89053b3 100644 --- a/drb-c2-core/app/internal/mqtt_handler.py +++ b/drb-c2-core/app/internal/mqtt_handler.py @@ -143,8 +143,8 @@ class MQTTHandler: if not call_id: return - # Look up assigned system for this node - node = await fstore.doc_get("nodes", node_id) + # Look up assigned system for this node (cached — assignment rarely changes) + node = await fstore.doc_get_cached("nodes", node_id) system_id = node.get("assigned_system_id") if node else None started_at_raw = payload.get("started_at") @@ -157,7 +157,7 @@ class MQTTHandler: # Prefer the name from OP25 metadata; fall back to the system config tgid_name = payload.get("tgid_name") or "" if not tgid_name and system_id and payload.get("tgid"): - system_doc = await fstore.doc_get("systems", system_id) + system_doc = await fstore.doc_get_cached("systems", system_id) if system_doc: tgid_int = int(payload["tgid"]) for tg in system_doc.get("config", {}).get("talkgroups", []): diff --git a/drb-c2-core/app/internal/node_sweeper.py b/drb-c2-core/app/internal/node_sweeper.py index b52a937..145c54b 100644 --- a/drb-c2-core/app/internal/node_sweeper.py +++ b/drb-c2-core/app/internal/node_sweeper.py @@ -4,7 +4,7 @@ from app.config import settings from app.internal.logger import logger from app.internal import firestore as fstore -SWEEP_INTERVAL = 30 # seconds +SWEEP_INTERVAL = 90 # seconds — matches node_offline_threshold; no gain in checking faster async def sweeper_loop(): diff --git a/drb-c2-core/app/internal/vocabulary_learner.py b/drb-c2-core/app/internal/vocabulary_learner.py index aa7e3d1..960f270 100644 --- a/drb-c2-core/app/internal/vocabulary_learner.py +++ b/drb-c2-core/app/internal/vocabulary_learner.py @@ -196,8 +196,8 @@ async def remove_term(system_id: str, term: str) -> None: async def get_vocabulary(system_id: str) -> dict: - """Return vocabulary and pending terms for a system.""" - doc = await fstore.doc_get("systems", system_id) + """Return vocabulary and pending terms for a system (TTL-cached, 5 min).""" + doc = await fstore.doc_get_cached("systems", system_id) if not doc: return {"vocabulary": [], "vocabulary_pending": [], "vocabulary_bootstrapped": False} return { @@ -281,8 +281,14 @@ async def _induct_system(system_id: str, system_doc: dict) -> None: system_name = system_doc.get("name", "Unknown") existing_vocab: list[str] = system_doc.get("vocabulary") or [] - # Fetch recent ended calls for this system - all_calls = await fstore.collection_list("calls", system_id=system_id, status="ended") + # Fetch calls from the last 7 days only — avoids scanning the entire history. + # Active calls have ended_at=None and are excluded by the range filter automatically. + # Needs a composite index on (system_id ASC, ended_at ASC). + cutoff = datetime.now(timezone.utc) - timedelta(days=7) + all_calls = await fstore.collection_where("calls", [ + ("system_id", "==", system_id), + ("ended_at", ">=", cutoff), + ]) if not all_calls: return diff --git a/drb-c2-core/app/routers/upload.py b/drb-c2-core/app/routers/upload.py index 566aa22..0b13f34 100644 --- a/drb-c2-core/app/routers/upload.py +++ b/drb-c2-core/app/routers/upload.py @@ -170,7 +170,7 @@ async def _run_intelligence_pipeline( # but global flag=False beats everything (master switch). system_ai_flags: dict = {} if system_id: - sys_doc = await fstore.doc_get("systems", system_id) + sys_doc = await fstore.doc_get_cached("systems", system_id) system_ai_flags = (sys_doc or {}).get("ai_flags") or {} def _flag(name: str) -> bool: