import asyncio import time as _time from typing import Optional, Any import firebase_admin from firebase_admin import credentials, firestore as fs from google.cloud.firestore_v1.base_query import FieldFilter from app.config import settings from app.internal.logger import logger # --------------------------------------------------------------------------- # In-memory TTL cache for rarely-changing documents (systems, nodes config) # --------------------------------------------------------------------------- # Key: "collection/doc_id" → (expires_at_monotonic, data_or_None) _doc_cache: dict[str, tuple[float, Optional[dict]]] = {} def _init_firebase(): if firebase_admin._apps: return firestore.client() if settings.gcp_credentials_path: cred = credentials.Certificate(settings.gcp_credentials_path) else: cred = credentials.ApplicationDefault() firebase_admin.initialize_app(cred) logger.info("Firebase initialised.") _init_firebase() db = fs.client(database_id=settings.firestore_database) # --------------------------------------------------------------------------- # Thin async wrappers — firebase-admin is synchronous, run in thread executor # --------------------------------------------------------------------------- async def doc_set(collection: str, doc_id: str, data: dict, merge: bool = True) -> None: ref = db.collection(collection).document(doc_id) await asyncio.to_thread(ref.set, data, merge=merge) async def doc_get(collection: str, doc_id: str) -> Optional[dict]: ref = db.collection(collection).document(doc_id) snap = await asyncio.to_thread(ref.get) return snap.to_dict() if snap.exists else None async def doc_update(collection: str, doc_id: str, data: dict) -> None: ref = db.collection(collection).document(doc_id) await asyncio.to_thread(ref.update, data) async def collection_list(collection: str, **filters) -> list[dict]: """ List all documents in a collection. Optional keyword filters: field=value pairs passed as equality where-clauses. """ def _query(): ref = db.collection(collection) for field, value in filters.items(): ref = ref.where(filter=FieldFilter(field, "==", value)) return [doc.to_dict() for doc in ref.stream()] return await asyncio.to_thread(_query) async def collection_where( collection: str, conditions: list[tuple[str, str, Any]], ) -> list[dict]: """ Query a collection with arbitrary where-clauses. conditions: list of (field, op, value) — e.g. [("ended_at", ">=", cutoff_dt)] Supports any Firestore operator: "==", "!=", "<", "<=", ">", ">=". """ def _query(): ref = db.collection(collection) for field, op, value in conditions: ref = ref.where(filter=FieldFilter(field, op, value)) return [doc.to_dict() for doc in ref.stream()] return await asyncio.to_thread(_query) async def doc_delete(collection: str, doc_id: str) -> None: ref = db.collection(collection).document(doc_id) await asyncio.to_thread(ref.delete) async def doc_get_cached(collection: str, doc_id: str, ttl: float = 300.0) -> Optional[dict]: """ Like doc_get but backed by a short-lived in-memory TTL cache. Use for documents that change rarely (systems config, node assignments). Default TTL is 5 minutes — a write will be visible within that window. """ key = f"{collection}/{doc_id}" now = _time.monotonic() entry = _doc_cache.get(key) if entry and now < entry[0]: return entry[1] data = await doc_get(collection, doc_id) _doc_cache[key] = (now + ttl, data) return data