From 030dd2d787fcef60db1dfb9450d3dce6aff47d99 Mon Sep 17 00:00:00 2001 From: Logan Date: Sat, 11 Apr 2026 21:16:14 -0400 Subject: [PATCH] =?UTF-8?q?File=09Change=20app/internal/storage.py=09Repla?= =?UTF-8?q?ced=20make=5Fpublic()=20+=20public=5Furl=20with=20a=20v2=20sign?= =?UTF-8?q?ed=20URL=20(1-year=20expiry,=20no=20public=20bucket=20needed)?= =?UTF-8?q?=20app/main.py=09Releases=20all=20in-use=20tokens=20at=20startu?= =?UTF-8?q?p=20=E2=80=94=20tokens=20from=20previous=20sessions=20are=20cle?= =?UTF-8?q?ared=20automatically=20app/routers/tokens.py=09Added=20POST=20/?= =?UTF-8?q?tokens/flush=20to=20force-release=20orphaned=20tokens=20on=20de?= =?UTF-8?q?mand?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drb-c2-core/app/internal/storage.py | 17 ++++++++++++++--- drb-c2-core/app/main.py | 19 +++++++++++++++++++ drb-c2-core/app/routers/tokens.py | 18 ++++++++++++++++++ 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/drb-c2-core/app/internal/storage.py b/drb-c2-core/app/internal/storage.py index a8a7a90..bafd5b2 100644 --- a/drb-c2-core/app/internal/storage.py +++ b/drb-c2-core/app/internal/storage.py @@ -1,26 +1,37 @@ import asyncio +import datetime from typing import Optional from app.config import settings from app.internal.logger import logger async def upload_audio(data: bytes, filename: str) -> Optional[str]: - """Upload audio bytes to GCS and return the public URL, or None if disabled.""" + """Upload audio bytes to GCS and return a signed URL, or None if disabled.""" if not settings.gcs_bucket: logger.info("GCS_BUCKET not configured — skipping audio upload.") return None def _upload() -> str: from google.cloud import storage + from google.oauth2 import service_account as sa if settings.gcp_credentials_path: client = storage.Client.from_service_account_json(settings.gcp_credentials_path) + signing_creds = sa.Credentials.from_service_account_file(settings.gcp_credentials_path) else: client = storage.Client() + signing_creds = None bucket = client.bucket(settings.gcs_bucket) blob = bucket.blob(f"calls/{filename}") blob.upload_from_string(data, content_type="audio/mpeg") - blob.make_public() - return blob.public_url + if signing_creds: + return blob.generate_signed_url( + version="v2", + expiration=datetime.timedelta(days=365), + method="GET", + credentials=signing_creds, + ) + # Fallback: return the gs:// URI (no public access) + return f"gs://{settings.gcs_bucket}/calls/{filename}" try: url = await asyncio.to_thread(_upload) diff --git a/drb-c2-core/app/main.py b/drb-c2-core/app/main.py index ff5403f..55d0489 100644 --- a/drb-c2-core/app/main.py +++ b/drb-c2-core/app/main.py @@ -8,11 +8,30 @@ from app.internal.node_sweeper import sweeper_loop from app.config import settings from app.internal.auth import require_firebase_token, require_service_or_firebase_token from app.routers import nodes, systems, calls, upload, tokens, incidents, alerts +from app.internal import firestore as fstore + + +async def _release_orphaned_tokens(): + """Release all in-use tokens on startup — voice connections don't survive server restarts.""" + def _find(): + from app.internal.firestore import db + return [d for d in db.collection("bot_tokens").where("in_use", "==", True).stream()] + + results = await asyncio.to_thread(_find) + for doc in results: + await fstore.doc_update("bot_tokens", doc.id, { + "in_use": False, + "assigned_node_id": None, + "assigned_at": None, + }) + if results: + logger.info(f"Released {len(results)} orphaned token(s) on startup.") @asynccontextmanager async def lifespan(app: FastAPI): logger.info("DRB C2 Core starting.") + await _release_orphaned_tokens() await mqtt_handler.connect() sweeper_task = asyncio.create_task(sweeper_loop()) diff --git a/drb-c2-core/app/routers/tokens.py b/drb-c2-core/app/routers/tokens.py index 8eb3ff1..4637f0c 100644 --- a/drb-c2-core/app/routers/tokens.py +++ b/drb-c2-core/app/routers/tokens.py @@ -42,6 +42,24 @@ async def add_token(body: TokenCreate): return {"token_id": token_id, "name": body.name} +@router.post("/flush", status_code=200) +async def flush_tokens(): + """Force-release all in-use tokens (admin utility — use when tokens get orphaned).""" + def _find(): + from app.internal.firestore import db + return [d for d in db.collection("bot_tokens").where("in_use", "==", True).stream()] + + import asyncio + results = await asyncio.to_thread(_find) + for doc in results: + await fstore.doc_update("bot_tokens", doc.id, { + "in_use": False, + "assigned_node_id": None, + "assigned_at": None, + }) + return {"released": len(results)} + + @router.delete("/{token_id}", status_code=204) async def delete_token(token_id: str): existing = await fstore.doc_get("bot_tokens", token_id)