import asyncio import datetime from typing import Optional from app.config import settings from app.internal.logger import logger def _safe_audio_filename(filename: str, call_id: str) -> str: """Return a safe GCS object name derived from the call_id. We ignore the client-supplied filename entirely and derive the name from the call_id (which we control) to prevent path traversal via crafted filenames. The original extension is preserved only if it's a known audio type. """ import os ext = os.path.splitext(filename)[-1].lower() if filename else "" if ext not in (".mp3", ".wav", ".ogg", ".m4a", ".aac", ".flac"): ext = ".mp3" return f"{call_id}{ext}" async def upload_audio(data: bytes, filename: str, call_id: str = "") -> Optional[str]: """Upload audio bytes to GCS and return a signed URL, or None if disabled.""" if not settings.gcs_bucket: logger.info("GCS_BUCKET not configured — skipping audio upload.") return None def _upload() -> str: from google.cloud import storage from google.oauth2 import service_account as sa if settings.gcp_credentials_path: client = storage.Client.from_service_account_json(settings.gcp_credentials_path) signing_creds = sa.Credentials.from_service_account_file(settings.gcp_credentials_path) else: client = storage.Client() signing_creds = None bucket = client.bucket(settings.gcs_bucket) safe_name = _safe_audio_filename(filename, call_id) blob = bucket.blob(f"calls/{safe_name}") blob.upload_from_string(data, content_type="audio/mpeg") if signing_creds: return blob.generate_signed_url( version="v2", expiration=datetime.timedelta(days=365), method="GET", credentials=signing_creds, ) # Fallback: return the gs:// URI (no public access) return f"gs://{settings.gcs_bucket}/calls/{filename}" try: url = await asyncio.to_thread(_upload) logger.info(f"Audio uploaded: {url}") return url except Exception as e: logger.error(f"GCS upload failed: {e}") return None