Files
server-26/drb-c2-core/app/internal/transcription.py
T
2026-04-12 22:36:21 -04:00

94 lines
3.1 KiB
Python

"""
Speech-to-text transcription for recorded calls using OpenAI Whisper.
Audio is downloaded from GCS then sent to the Whisper API. Falls back to
returning None on any failure so the intelligence pipeline can still run.
"""
import asyncio
import tempfile
import os
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
async def transcribe_call(call_id: str, gcs_uri: str) -> Optional[str]:
"""
Transcribe audio at the given GCS URI and store the result in Firestore.
Args:
call_id: Firestore document ID in the 'calls' collection.
gcs_uri: GCS URI of the audio file, e.g. gs://bucket/calls/xyz.mp3
Returns:
The transcript string, or None if transcription failed / was skipped.
"""
if not gcs_uri or not gcs_uri.startswith("gs://"):
return None
try:
transcript = await asyncio.to_thread(_sync_transcribe, gcs_uri)
except Exception as e:
logger.warning(f"Transcription failed for call {call_id}: {e}")
return None
if transcript:
try:
await fstore.doc_set("calls", call_id, {"transcript": transcript})
logger.info(f"Transcript saved for call {call_id} ({len(transcript)} chars)")
except Exception as e:
logger.warning(f"Could not save transcript for {call_id}: {e}")
return transcript
def _sync_transcribe(gcs_uri: str) -> Optional[str]:
"""Download audio from GCS and transcribe with OpenAI Whisper."""
from google.cloud import storage as gcs
from google.oauth2 import service_account
from openai import OpenAI
from app.config import settings
if not settings.openai_api_key:
logger.warning("OPENAI_API_KEY not set — transcription disabled.")
return None
# Parse gs://bucket/path/to/file.mp3
without_scheme = gcs_uri[len("gs://"):]
bucket_name, blob_path = without_scheme.split("/", 1)
# Download to a temp file
if settings.gcp_credentials_path:
creds = service_account.Credentials.from_service_account_file(
settings.gcp_credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
gcs_client = gcs.Client(credentials=creds)
else:
gcs_client = gcs.Client()
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(blob_path)
suffix = os.path.splitext(blob_path)[1] or ".mp3"
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp_path = tmp.name
try:
blob.download_to_filename(tmp_path)
openai_client = OpenAI(api_key=settings.openai_api_key)
with open(tmp_path, "rb") as f:
response = openai_client.audio.transcriptions.create(
model="whisper-1",
file=f,
language="en",
prompt="Public safety radio communication. May include police codes, fire, EMS, talkgroup IDs, unit numbers, addresses.",
)
return response.text.strip() or None
finally:
try:
os.unlink(tmp_path)
except OSError:
pass