Files
server-26/drb-c2-core/app/internal/storage.py
T
Logan 030dd2d787 File Change
app/internal/storage.py	Replaced make_public() + public_url with a v2 signed URL (1-year expiry, no public bucket needed)
app/main.py	Releases all in-use tokens at startup — tokens from previous sessions are cleared automatically
app/routers/tokens.py	Added POST /tokens/flush to force-release orphaned tokens on demand
2026-04-11 21:16:14 -04:00

43 lines
1.6 KiB
Python

import asyncio
import datetime
from typing import Optional
from app.config import settings
from app.internal.logger import logger
async def upload_audio(data: bytes, filename: str) -> Optional[str]:
"""Upload audio bytes to GCS and return a signed URL, or None if disabled."""
if not settings.gcs_bucket:
logger.info("GCS_BUCKET not configured — skipping audio upload.")
return None
def _upload() -> str:
from google.cloud import storage
from google.oauth2 import service_account as sa
if settings.gcp_credentials_path:
client = storage.Client.from_service_account_json(settings.gcp_credentials_path)
signing_creds = sa.Credentials.from_service_account_file(settings.gcp_credentials_path)
else:
client = storage.Client()
signing_creds = None
bucket = client.bucket(settings.gcs_bucket)
blob = bucket.blob(f"calls/{filename}")
blob.upload_from_string(data, content_type="audio/mpeg")
if signing_creds:
return blob.generate_signed_url(
version="v2",
expiration=datetime.timedelta(days=365),
method="GET",
credentials=signing_creds,
)
# Fallback: return the gs:// URI (no public access)
return f"gs://{settings.gcs_bucket}/calls/{filename}"
try:
url = await asyncio.to_thread(_upload)
logger.info(f"Audio uploaded: {url}")
return url
except Exception as e:
logger.error(f"GCS upload failed: {e}")
return None