import secrets import time from collections import defaultdict, deque from typing import Optional from fastapi import HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from firebase_admin import auth as firebase_auth from app.config import settings _bearer = HTTPBearer(auto_error=False) async def require_firebase_token( credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), ) -> dict: """Verify a Firebase ID token from the Authorization: Bearer header.""" if not credentials: raise HTTPException(status_code=401, detail="Missing authorization token") try: return firebase_auth.verify_id_token(credentials.credentials) except Exception: raise HTTPException(status_code=401, detail="Invalid or expired token") async def require_service_or_firebase_token( credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), ) -> dict: """Accept either a Firebase ID token or the internal service key.""" if not credentials: raise HTTPException(status_code=401, detail="Missing authorization token") token = credentials.credentials if settings.service_key and secrets.compare_digest(token, settings.service_key): return {"service": True} try: return firebase_auth.verify_id_token(token) except Exception: raise HTTPException(status_code=401, detail="Invalid or expired token") async def require_admin_token( credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), ) -> dict: """Verify a Firebase ID token AND require the admin custom claim.""" decoded = await require_firebase_token(credentials) if not decoded.get("admin"): raise HTTPException(status_code=403, detail="Admin access required") return decoded async def require_service_key( credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), ) -> dict: """Accept only the internal service key — used for bot-only endpoints.""" if not credentials: raise HTTPException(status_code=401, detail="Missing authorization token") if not settings.service_key: raise HTTPException(status_code=503, detail="Service key not configured") if not secrets.compare_digest(credentials.credentials, settings.service_key): raise HTTPException(status_code=403, detail="Service key required") return {"service": True} async def require_service_key_or_admin( credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer), ) -> dict: """Accept either the internal service key or a Firebase admin token. Used for endpoints that the Discord bot (service key) and dashboard admins (Firebase + admin claim) both need to call, but regular Firebase users must not. """ if not credentials: raise HTTPException(status_code=401, detail="Missing authorization token") token = credentials.credentials if settings.service_key and secrets.compare_digest(token, settings.service_key): return {"service": True} try: decoded = firebase_auth.verify_id_token(token) except Exception: raise HTTPException(status_code=401, detail="Invalid or expired token") if not decoded.get("admin"): raise HTTPException(status_code=403, detail="Admin access required") return decoded # --------------------------------------------------------------------------- # Simple in-memory sliding-window rate limiter # --------------------------------------------------------------------------- # Not persistent across restarts; good enough for a single-instance deployment. # Key format is caller-defined (e.g. "{uid}:{endpoint}"). class _RateLimiter: def __init__(self, max_calls: int, window_seconds: int): self.max_calls = max_calls self.window = window_seconds self._log: dict[str, deque] = defaultdict(deque) def check(self, key: str) -> None: now = time.monotonic() q = self._log[key] while q and now - q[0] > self.window: q.popleft() if len(q) >= self.max_calls: raise HTTPException( status_code=429, detail="Rate limit exceeded. Please wait before trying again.", ) q.append(now) # Shared limiter instances # trip chat: 20 requests per user per 5 minutes trip_chat_limiter = _RateLimiter(max_calls=20, window_seconds=300) # per-incident summarize: 5 per incident per 10 minutes summarize_limiter = _RateLimiter(max_calls=5, window_seconds=600) # vocabulary bootstrap: 2 per system per hour bootstrap_limiter = _RateLimiter(max_calls=2, window_seconds=3600)