45 lines
1.7 KiB
Python
45 lines
1.7 KiB
Python
from typing import Optional
|
|
from fastapi import HTTPException, Security
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from firebase_admin import auth as firebase_auth
|
|
from app.config import settings
|
|
|
|
_bearer = HTTPBearer(auto_error=False)
|
|
|
|
|
|
async def require_firebase_token(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
|
|
) -> dict:
|
|
"""Verify a Firebase ID token from the Authorization: Bearer header."""
|
|
if not credentials:
|
|
raise HTTPException(status_code=401, detail="Missing authorization token")
|
|
try:
|
|
return firebase_auth.verify_id_token(credentials.credentials)
|
|
except Exception:
|
|
raise HTTPException(status_code=401, detail="Invalid or expired token")
|
|
|
|
|
|
async def require_service_or_firebase_token(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
|
|
) -> dict:
|
|
"""Accept either a Firebase ID token or the internal service key."""
|
|
if not credentials:
|
|
raise HTTPException(status_code=401, detail="Missing authorization token")
|
|
token = credentials.credentials
|
|
if settings.service_key and token == settings.service_key:
|
|
return {"service": True}
|
|
try:
|
|
return firebase_auth.verify_id_token(token)
|
|
except Exception:
|
|
raise HTTPException(status_code=401, detail="Invalid or expired token")
|
|
|
|
|
|
async def require_admin_token(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer),
|
|
) -> dict:
|
|
"""Verify a Firebase ID token AND require the admin custom claim."""
|
|
decoded = await require_firebase_token(credentials)
|
|
if not decoded.get("admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
return decoded
|