Files
Logan 1f17b6c0d2 feat: add role-based user management, audit log, and session tracking
Introduces a full user management system with three roles (admin, operator,
viewer), an audit log, and per-session login history.

Backend:
- app/internal/audit.py: write_audit() helper → audit_log Firestore collection
- app/internal/auth.py: get_role() helper; require_admin_token accepts both
  legacy admin:true claim and new role:"admin" claim for backward compat
- app/routers/users.py: CRUD under /admin/users — list, create (returns
  one-time invite link), get (with sessions), patch role/nodes/name,
  disable, enable, delete; operator role requires ≥1 owned node
- app/routers/links.py: POST /auth/session records sign-in events to
  user_sessions Firestore collection
- app/routers/admin.py: GET /admin/audit paginated endpoint
- app/main.py: register users router

Frontend:
- AuthProvider: exposes role, isAdmin, isOperator, ownedNodeIds from claims
- Nav: role-gated links — viewers get dashboard/calls/incidents/map/alerts/
  trips; operators add nodes/systems/tokens; admins add admin
- admin/page.tsx: new Users tab (list table, create modal, inline edit panel
  with role/nodes editor, disable/enable/delete, login history) and Audit
  Log tab (paginated, color-coded actions)
- login/page.tsx: calls recordSession() on email and Google sign-in
- nodes, systems, tokens pages: role guards redirect viewers to dashboard
- profile/page.tsx: shows accurate role badge and label
- lib/types.ts: UserRole, UserRecord, UserSession, AuditEntry types
- lib/c2api.ts: user management methods + recordSession

Firestore collections added: user_profiles, audit_log, user_sessions
Firebase custom claims schema: { role, owned_node_ids, admin (legacy) }
2026-06-22 00:02:09 -04:00

133 lines
5.2 KiB
Python

import secrets
import time
from collections import defaultdict, deque
from typing import Optional
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from firebase_admin import auth as firebase_auth
from app.config import settings
_bearer = HTTPBearer(auto_error=False)
async def require_firebase_token(
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
) -> dict:
"""Verify a Firebase ID token from the Authorization: Bearer header."""
if not credentials:
raise HTTPException(status_code=401, detail="Missing authorization token")
try:
return firebase_auth.verify_id_token(credentials.credentials)
except Exception:
raise HTTPException(status_code=401, detail="Invalid or expired token")
async def require_service_or_firebase_token(
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
) -> dict:
"""Accept either a Firebase ID token or the internal service key."""
if not credentials:
raise HTTPException(status_code=401, detail="Missing authorization token")
token = credentials.credentials
if settings.service_key and secrets.compare_digest(token, settings.service_key):
return {"service": True}
try:
return firebase_auth.verify_id_token(token)
except Exception:
raise HTTPException(status_code=401, detail="Invalid or expired token")
def get_role(decoded: dict) -> str:
"""Extract the effective role from a decoded Firebase token.
Checks the granular ``role`` claim first, then falls back to the legacy
``admin`` boolean so existing tokens continue to work during the transition.
"""
if decoded.get("role") == "admin" or decoded.get("admin"):
return "admin"
role = decoded.get("role", "viewer")
return role if role in ("admin", "operator", "viewer") else "viewer"
async def require_admin_token(
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
) -> dict:
"""Verify a Firebase ID token AND require the admin role.
Accepts both the legacy ``admin: True`` boolean claim and the newer
``role: "admin"`` claim so tokens issued before the role migration still work.
"""
decoded = await require_firebase_token(credentials)
if get_role(decoded) != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return decoded
async def require_service_key(
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
) -> dict:
"""Accept only the internal service key — used for bot-only endpoints."""
if not credentials:
raise HTTPException(status_code=401, detail="Missing authorization token")
if not settings.service_key:
raise HTTPException(status_code=503, detail="Service key not configured")
if not secrets.compare_digest(credentials.credentials, settings.service_key):
raise HTTPException(status_code=403, detail="Service key required")
return {"service": True}
async def require_service_key_or_admin(
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
) -> dict:
"""Accept either the internal service key or a Firebase admin token.
Used for endpoints that the Discord bot (service key) and dashboard admins
(Firebase + admin claim) both need to call, but regular Firebase users must not.
"""
if not credentials:
raise HTTPException(status_code=401, detail="Missing authorization token")
token = credentials.credentials
if settings.service_key and secrets.compare_digest(token, settings.service_key):
return {"service": True}
try:
decoded = firebase_auth.verify_id_token(token)
except Exception:
raise HTTPException(status_code=401, detail="Invalid or expired token")
if get_role(decoded) != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return decoded
# ---------------------------------------------------------------------------
# Simple in-memory sliding-window rate limiter
# ---------------------------------------------------------------------------
# Not persistent across restarts; good enough for a single-instance deployment.
# Key format is caller-defined (e.g. "{uid}:{endpoint}").
class _RateLimiter:
def __init__(self, max_calls: int, window_seconds: int):
self.max_calls = max_calls
self.window = window_seconds
self._log: dict[str, deque] = defaultdict(deque)
def check(self, key: str) -> None:
now = time.monotonic()
q = self._log[key]
while q and now - q[0] > self.window:
q.popleft()
if len(q) >= self.max_calls:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded. Please wait before trying again.",
)
q.append(now)
# Shared limiter instances
# trip chat: 20 requests per user per 5 minutes
trip_chat_limiter = _RateLimiter(max_calls=20, window_seconds=300)
# per-incident summarize: 5 per incident per 10 minutes
summarize_limiter = _RateLimiter(max_calls=5, window_seconds=600)
# vocabulary bootstrap: 2 per system per hour
bootstrap_limiter = _RateLimiter(max_calls=2, window_seconds=3600)