Files
Logan 7e1b01a275 Updates to reduce firestore calls to try and stay in free tier
### Firestore read reductions

**1. `doc_get_cached()` in `firestore.py` — new 5-min TTL cache**
One place, benefits everything. System and node config documents almost never change during a monitoring session.

**2. System doc: 4 reads → 1 per call**
| Before | After |
|---|---|
| `upload.py` — `doc_get("systems")` for ai_flags | `doc_get_cached` |
| `transcription.py` — `get_vocabulary()` → `doc_get("systems")` | cache hit |
| `intelligence.py` — `get_vocabulary()` → `doc_get("systems")` | cache hit |
| `intelligence.py` — `doc_get("systems")` again for ten_codes | eliminated (reads same cached doc) |

**3. Node doc: cached in `_on_call_start` and `intelligence.py`**
The node is read every call event to get `assigned_system_id` and lat/lon for geocoding. Both now use the cache — node assignments and positions essentially never change at runtime.

**4. Node sweeper: 30s → 90s interval**
The sweeper was doing a full node collection scan 3× more often than necessary — the offline threshold is already 90s. Cuts sweeper reads by 66%.

**5. Vocabulary induction: scans all-time calls → last 7 days**
Previously fetched every ended call for a system (could be thousands). Now scoped to the last 7 days.

> **Note:** The vocabulary induction query `(system_id == X, ended_at >= cutoff)` needs a Firestore
> composite index on `(system_id ASC, ended_at ASC)`. When the induction loop first fires it will log
> an error with a Firebase Console link to create it in one click.
2026-05-04 02:05:00 -04:00

105 lines
3.6 KiB
Python

import asyncio
import time as _time
from typing import Optional, Any
import firebase_admin
from firebase_admin import credentials, firestore as fs
from google.cloud.firestore_v1.base_query import FieldFilter
from app.config import settings
from app.internal.logger import logger
# ---------------------------------------------------------------------------
# In-memory TTL cache for rarely-changing documents (systems, nodes config)
# ---------------------------------------------------------------------------
# Key: "collection/doc_id" → (expires_at_monotonic, data_or_None)
_doc_cache: dict[str, tuple[float, Optional[dict]]] = {}
def _init_firebase():
if firebase_admin._apps:
return firestore.client()
if settings.gcp_credentials_path:
cred = credentials.Certificate(settings.gcp_credentials_path)
else:
cred = credentials.ApplicationDefault()
firebase_admin.initialize_app(cred)
logger.info("Firebase initialised.")
_init_firebase()
db = fs.client(database_id=settings.firestore_database)
# ---------------------------------------------------------------------------
# Thin async wrappers — firebase-admin is synchronous, run in thread executor
# ---------------------------------------------------------------------------
async def doc_set(collection: str, doc_id: str, data: dict, merge: bool = True) -> None:
ref = db.collection(collection).document(doc_id)
await asyncio.to_thread(ref.set, data, merge=merge)
async def doc_get(collection: str, doc_id: str) -> Optional[dict]:
ref = db.collection(collection).document(doc_id)
snap = await asyncio.to_thread(ref.get)
return snap.to_dict() if snap.exists else None
async def doc_update(collection: str, doc_id: str, data: dict) -> None:
ref = db.collection(collection).document(doc_id)
await asyncio.to_thread(ref.update, data)
async def collection_list(collection: str, **filters) -> list[dict]:
"""
List all documents in a collection.
Optional keyword filters: field=value pairs passed as equality where-clauses.
"""
def _query():
ref = db.collection(collection)
for field, value in filters.items():
ref = ref.where(filter=FieldFilter(field, "==", value))
return [doc.to_dict() for doc in ref.stream()]
return await asyncio.to_thread(_query)
async def collection_where(
collection: str,
conditions: list[tuple[str, str, Any]],
) -> list[dict]:
"""
Query a collection with arbitrary where-clauses.
conditions: list of (field, op, value) — e.g. [("ended_at", ">=", cutoff_dt)]
Supports any Firestore operator: "==", "!=", "<", "<=", ">", ">=".
"""
def _query():
ref = db.collection(collection)
for field, op, value in conditions:
ref = ref.where(filter=FieldFilter(field, op, value))
return [doc.to_dict() for doc in ref.stream()]
return await asyncio.to_thread(_query)
async def doc_delete(collection: str, doc_id: str) -> None:
ref = db.collection(collection).document(doc_id)
await asyncio.to_thread(ref.delete)
async def doc_get_cached(collection: str, doc_id: str, ttl: float = 300.0) -> Optional[dict]:
"""
Like doc_get but backed by a short-lived in-memory TTL cache.
Use for documents that change rarely (systems config, node assignments).
Default TTL is 5 minutes — a write will be visible within that window.
"""
key = f"{collection}/{doc_id}"
now = _time.monotonic()
entry = _doc_cache.get(key)
if entry and now < entry[0]:
return entry[1]
data = await doc_get(collection, doc_id)
_doc_cache[key] = (now + ttl, data)
return data