import asyncio from typing import Optional, Any import firebase_admin from firebase_admin import credentials, firestore as fs from app.config import settings from app.internal.logger import logger def _init_firebase(): if firebase_admin._apps: return firestore.client() if settings.gcp_credentials_path: cred = credentials.Certificate(settings.gcp_credentials_path) else: cred = credentials.ApplicationDefault() firebase_admin.initialize_app(cred) logger.info("Firebase initialised.") _init_firebase() db = fs.client(database_id=settings.firestore_database) # --------------------------------------------------------------------------- # Thin async wrappers — firebase-admin is synchronous, run in thread executor # --------------------------------------------------------------------------- async def doc_set(collection: str, doc_id: str, data: dict, merge: bool = True) -> None: ref = db.collection(collection).document(doc_id) await asyncio.to_thread(ref.set, data, merge=merge) async def doc_get(collection: str, doc_id: str) -> Optional[dict]: ref = db.collection(collection).document(doc_id) snap = await asyncio.to_thread(ref.get) return snap.to_dict() if snap.exists else None async def doc_update(collection: str, doc_id: str, data: dict) -> None: ref = db.collection(collection).document(doc_id) await asyncio.to_thread(ref.update, data) async def collection_list(collection: str, **filters) -> list[dict]: """ List all documents in a collection. Optional keyword filters: field=value pairs passed as equality where-clauses. """ def _query(): ref = db.collection(collection) for field, value in filters.items(): ref = ref.where(field, "==", value) return [doc.to_dict() for doc in ref.stream()] return await asyncio.to_thread(_query) async def collection_where( collection: str, conditions: list[tuple[str, str, Any]], ) -> list[dict]: """ Query a collection with arbitrary where-clauses. conditions: list of (field, op, value) — e.g. [("ended_at", ">=", cutoff_dt)] Supports any Firestore operator: "==", "!=", "<", "<=", ">", ">=". """ def _query(): ref = db.collection(collection) for field, op, value in conditions: ref = ref.where(field, op, value) return [doc.to_dict() for doc in ref.stream()] return await asyncio.to_thread(_query) async def doc_delete(collection: str, doc_id: str) -> None: ref = db.collection(collection).document(doc_id) await asyncio.to_thread(ref.delete)