Updates to reduce firestore calls to try and stay in free tier
### Firestore read reductions
**1. `doc_get_cached()` in `firestore.py` — new 5-min TTL cache**
One place, benefits everything. System and node config documents almost never change during a monitoring session.
**2. System doc: 4 reads → 1 per call**
| Before | After |
|---|---|
| `upload.py` — `doc_get("systems")` for ai_flags | `doc_get_cached` |
| `transcription.py` — `get_vocabulary()` → `doc_get("systems")` | cache hit |
| `intelligence.py` — `get_vocabulary()` → `doc_get("systems")` | cache hit |
| `intelligence.py` — `doc_get("systems")` again for ten_codes | eliminated (reads same cached doc) |
**3. Node doc: cached in `_on_call_start` and `intelligence.py`**
The node is read every call event to get `assigned_system_id` and lat/lon for geocoding. Both now use the cache — node assignments and positions essentially never change at runtime.
**4. Node sweeper: 30s → 90s interval**
The sweeper was doing a full node collection scan 3× more often than necessary — the offline threshold is already 90s. Cuts sweeper reads by 66%.
**5. Vocabulary induction: scans all-time calls → last 7 days**
Previously fetched every ended call for a system (could be thousands). Now scoped to the last 7 days.
> **Note:** The vocabulary induction query `(system_id == X, ended_at >= cutoff)` needs a Firestore
> composite index on `(system_id ASC, ended_at ASC)`. When the induction loop first fires it will log
> an error with a Firebase Console link to create it in one click.
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import time as _time
|
||||||
from typing import Optional, Any
|
from typing import Optional, Any
|
||||||
import firebase_admin
|
import firebase_admin
|
||||||
from firebase_admin import credentials, firestore as fs
|
from firebase_admin import credentials, firestore as fs
|
||||||
@@ -6,6 +7,12 @@ from google.cloud.firestore_v1.base_query import FieldFilter
|
|||||||
from app.config import settings
|
from app.config import settings
|
||||||
from app.internal.logger import logger
|
from app.internal.logger import logger
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# In-memory TTL cache for rarely-changing documents (systems, nodes config)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Key: "collection/doc_id" → (expires_at_monotonic, data_or_None)
|
||||||
|
_doc_cache: dict[str, tuple[float, Optional[dict]]] = {}
|
||||||
|
|
||||||
|
|
||||||
def _init_firebase():
|
def _init_firebase():
|
||||||
if firebase_admin._apps:
|
if firebase_admin._apps:
|
||||||
@@ -79,3 +86,19 @@ async def collection_where(
|
|||||||
async def doc_delete(collection: str, doc_id: str) -> None:
|
async def doc_delete(collection: str, doc_id: str) -> None:
|
||||||
ref = db.collection(collection).document(doc_id)
|
ref = db.collection(collection).document(doc_id)
|
||||||
await asyncio.to_thread(ref.delete)
|
await asyncio.to_thread(ref.delete)
|
||||||
|
|
||||||
|
|
||||||
|
async def doc_get_cached(collection: str, doc_id: str, ttl: float = 300.0) -> Optional[dict]:
|
||||||
|
"""
|
||||||
|
Like doc_get but backed by a short-lived in-memory TTL cache.
|
||||||
|
Use for documents that change rarely (systems config, node assignments).
|
||||||
|
Default TTL is 5 minutes — a write will be visible within that window.
|
||||||
|
"""
|
||||||
|
key = f"{collection}/{doc_id}"
|
||||||
|
now = _time.monotonic()
|
||||||
|
entry = _doc_cache.get(key)
|
||||||
|
if entry and now < entry[0]:
|
||||||
|
return entry[1]
|
||||||
|
data = await doc_get(collection, doc_id)
|
||||||
|
_doc_cache[key] = (now + ttl, data)
|
||||||
|
return data
|
||||||
|
|||||||
@@ -99,11 +99,10 @@ async def extract_scenes(
|
|||||||
vocabulary: list[str] = []
|
vocabulary: list[str] = []
|
||||||
ten_codes: dict[str, str] = {}
|
ten_codes: dict[str, str] = {}
|
||||||
if system_id:
|
if system_id:
|
||||||
from app.internal.vocabulary_learner import get_vocabulary
|
# Single cached read — vocabulary and ten_codes live on the same document.
|
||||||
vocab_data = await get_vocabulary(system_id)
|
system_doc = await fstore.doc_get_cached("systems", system_id)
|
||||||
vocabulary = vocab_data.get("vocabulary") or []
|
|
||||||
system_doc = await fstore.doc_get("systems", system_id)
|
|
||||||
if system_doc:
|
if system_doc:
|
||||||
|
vocabulary = system_doc.get("vocabulary") or []
|
||||||
ten_codes = system_doc.get("ten_codes") or {}
|
ten_codes = system_doc.get("ten_codes") or {}
|
||||||
|
|
||||||
raw_scenes: list[dict] = await asyncio.to_thread(
|
raw_scenes: list[dict] = await asyncio.to_thread(
|
||||||
@@ -118,7 +117,7 @@ async def extract_scenes(
|
|||||||
node_lat: Optional[float] = None
|
node_lat: Optional[float] = None
|
||||||
node_lon: Optional[float] = None
|
node_lon: Optional[float] = None
|
||||||
if node_id:
|
if node_id:
|
||||||
node_doc = await fstore.doc_get("nodes", node_id)
|
node_doc = await fstore.doc_get_cached("nodes", node_id)
|
||||||
if node_doc:
|
if node_doc:
|
||||||
node_lat = node_doc.get("lat")
|
node_lat = node_doc.get("lat")
|
||||||
node_lon = node_doc.get("lon")
|
node_lon = node_doc.get("lon")
|
||||||
|
|||||||
@@ -143,8 +143,8 @@ class MQTTHandler:
|
|||||||
if not call_id:
|
if not call_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Look up assigned system for this node
|
# Look up assigned system for this node (cached — assignment rarely changes)
|
||||||
node = await fstore.doc_get("nodes", node_id)
|
node = await fstore.doc_get_cached("nodes", node_id)
|
||||||
system_id = node.get("assigned_system_id") if node else None
|
system_id = node.get("assigned_system_id") if node else None
|
||||||
|
|
||||||
started_at_raw = payload.get("started_at")
|
started_at_raw = payload.get("started_at")
|
||||||
@@ -157,7 +157,7 @@ class MQTTHandler:
|
|||||||
# Prefer the name from OP25 metadata; fall back to the system config
|
# Prefer the name from OP25 metadata; fall back to the system config
|
||||||
tgid_name = payload.get("tgid_name") or ""
|
tgid_name = payload.get("tgid_name") or ""
|
||||||
if not tgid_name and system_id and payload.get("tgid"):
|
if not tgid_name and system_id and payload.get("tgid"):
|
||||||
system_doc = await fstore.doc_get("systems", system_id)
|
system_doc = await fstore.doc_get_cached("systems", system_id)
|
||||||
if system_doc:
|
if system_doc:
|
||||||
tgid_int = int(payload["tgid"])
|
tgid_int = int(payload["tgid"])
|
||||||
for tg in system_doc.get("config", {}).get("talkgroups", []):
|
for tg in system_doc.get("config", {}).get("talkgroups", []):
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from app.config import settings
|
|||||||
from app.internal.logger import logger
|
from app.internal.logger import logger
|
||||||
from app.internal import firestore as fstore
|
from app.internal import firestore as fstore
|
||||||
|
|
||||||
SWEEP_INTERVAL = 30 # seconds
|
SWEEP_INTERVAL = 90 # seconds — matches node_offline_threshold; no gain in checking faster
|
||||||
|
|
||||||
|
|
||||||
async def sweeper_loop():
|
async def sweeper_loop():
|
||||||
|
|||||||
@@ -196,8 +196,8 @@ async def remove_term(system_id: str, term: str) -> None:
|
|||||||
|
|
||||||
|
|
||||||
async def get_vocabulary(system_id: str) -> dict:
|
async def get_vocabulary(system_id: str) -> dict:
|
||||||
"""Return vocabulary and pending terms for a system."""
|
"""Return vocabulary and pending terms for a system (TTL-cached, 5 min)."""
|
||||||
doc = await fstore.doc_get("systems", system_id)
|
doc = await fstore.doc_get_cached("systems", system_id)
|
||||||
if not doc:
|
if not doc:
|
||||||
return {"vocabulary": [], "vocabulary_pending": [], "vocabulary_bootstrapped": False}
|
return {"vocabulary": [], "vocabulary_pending": [], "vocabulary_bootstrapped": False}
|
||||||
return {
|
return {
|
||||||
@@ -281,8 +281,14 @@ async def _induct_system(system_id: str, system_doc: dict) -> None:
|
|||||||
system_name = system_doc.get("name", "Unknown")
|
system_name = system_doc.get("name", "Unknown")
|
||||||
existing_vocab: list[str] = system_doc.get("vocabulary") or []
|
existing_vocab: list[str] = system_doc.get("vocabulary") or []
|
||||||
|
|
||||||
# Fetch recent ended calls for this system
|
# Fetch calls from the last 7 days only — avoids scanning the entire history.
|
||||||
all_calls = await fstore.collection_list("calls", system_id=system_id, status="ended")
|
# Active calls have ended_at=None and are excluded by the range filter automatically.
|
||||||
|
# Needs a composite index on (system_id ASC, ended_at ASC).
|
||||||
|
cutoff = datetime.now(timezone.utc) - timedelta(days=7)
|
||||||
|
all_calls = await fstore.collection_where("calls", [
|
||||||
|
("system_id", "==", system_id),
|
||||||
|
("ended_at", ">=", cutoff),
|
||||||
|
])
|
||||||
if not all_calls:
|
if not all_calls:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|||||||
@@ -170,7 +170,7 @@ async def _run_intelligence_pipeline(
|
|||||||
# but global flag=False beats everything (master switch).
|
# but global flag=False beats everything (master switch).
|
||||||
system_ai_flags: dict = {}
|
system_ai_flags: dict = {}
|
||||||
if system_id:
|
if system_id:
|
||||||
sys_doc = await fstore.doc_get("systems", system_id)
|
sys_doc = await fstore.doc_get_cached("systems", system_id)
|
||||||
system_ai_flags = (sys_doc or {}).get("ai_flags") or {}
|
system_ai_flags = (sys_doc or {}).get("ai_flags") or {}
|
||||||
|
|
||||||
def _flag(name: str) -> bool:
|
def _flag(name: str) -> bool:
|
||||||
|
|||||||
Reference in New Issue
Block a user