diff --git a/drb-c2-core/app/internal/storage.py b/drb-c2-core/app/internal/storage.py index a8a7a90..bafd5b2 100644 --- a/drb-c2-core/app/internal/storage.py +++ b/drb-c2-core/app/internal/storage.py @@ -1,26 +1,37 @@ import asyncio +import datetime from typing import Optional from app.config import settings from app.internal.logger import logger async def upload_audio(data: bytes, filename: str) -> Optional[str]: - """Upload audio bytes to GCS and return the public URL, or None if disabled.""" + """Upload audio bytes to GCS and return a signed URL, or None if disabled.""" if not settings.gcs_bucket: logger.info("GCS_BUCKET not configured — skipping audio upload.") return None def _upload() -> str: from google.cloud import storage + from google.oauth2 import service_account as sa if settings.gcp_credentials_path: client = storage.Client.from_service_account_json(settings.gcp_credentials_path) + signing_creds = sa.Credentials.from_service_account_file(settings.gcp_credentials_path) else: client = storage.Client() + signing_creds = None bucket = client.bucket(settings.gcs_bucket) blob = bucket.blob(f"calls/{filename}") blob.upload_from_string(data, content_type="audio/mpeg") - blob.make_public() - return blob.public_url + if signing_creds: + return blob.generate_signed_url( + version="v2", + expiration=datetime.timedelta(days=365), + method="GET", + credentials=signing_creds, + ) + # Fallback: return the gs:// URI (no public access) + return f"gs://{settings.gcs_bucket}/calls/{filename}" try: url = await asyncio.to_thread(_upload) diff --git a/drb-c2-core/app/main.py b/drb-c2-core/app/main.py index ff5403f..55d0489 100644 --- a/drb-c2-core/app/main.py +++ b/drb-c2-core/app/main.py @@ -8,11 +8,30 @@ from app.internal.node_sweeper import sweeper_loop from app.config import settings from app.internal.auth import require_firebase_token, require_service_or_firebase_token from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts +from app.internal import firestore as fstore + + +async def _release_orphaned_tokens(): + """Release all in-use tokens on startup — voice connections don't survive server restarts.""" + def _find(): + from app.internal.firestore import db + return [d for d in db.collection("bot_tokens").where("in_use", "==", True).stream()] + + results = await asyncio.to_thread(_find) + for doc in results: + await fstore.doc_update("bot_tokens", doc.id, { + "in_use": False, + "assigned_node_id": None, + "assigned_at": None, + }) + if results: + logger.info(f"Released {len(results)} orphaned token(s) on startup.") @asynccontextmanager async def lifespan(app: FastAPI): logger.info("DRB C2 Core starting.") + await _release_orphaned_tokens() await mqtt_handler.connect() sweeper_task = asyncio.create_task(sweeper_loop()) diff --git a/drb-c2-core/app/routers/tokens.py b/drb-c2-core/app/routers/tokens.py index 8eb3ff1..4637f0c 100644 --- a/drb-c2-core/app/routers/tokens.py +++ b/drb-c2-core/app/routers/tokens.py @@ -42,6 +42,24 @@ async def add_token(body: TokenCreate): return {"token_id": token_id, "name": body.name} +@router.post("/flush", status_code=200) +async def flush_tokens(): + """Force-release all in-use tokens (admin utility — use when tokens get orphaned).""" + def _find(): + from app.internal.firestore import db + return [d for d in db.collection("bot_tokens").where("in_use", "==", True).stream()] + + import asyncio + results = await asyncio.to_thread(_find) + for doc in results: + await fstore.doc_update("bot_tokens", doc.id, { + "in_use": False, + "assigned_node_id": None, + "assigned_at": None, + }) + return {"released": len(results)} + + @router.delete("/{token_id}", status_code=204) async def delete_token(token_id: str): existing = await fstore.doc_get("bot_tokens", token_id)