Files
server-26/drb-c2-core/app/internal/transcription.py
T
2026-04-26 11:01:32 -04:00

151 lines
5.5 KiB
Python

"""
Speech-to-text transcription for recorded calls using OpenAI Whisper.
Audio is downloaded from GCS then sent to the Whisper API. Falls back to
returning None on any failure so the intelligence pipeline can still run.
"""
import asyncio
import tempfile
import os
from typing import Optional
from app.internal.logger import logger
from app.internal import firestore as fstore
# Whisper treats `prompt` as preceding transcript text, not instructions.
# Writing it as actual radio speech primes the vocabulary toward P25 codes
# and phrasing before the model hears the audio.
_WHISPER_PROMPT = (
"10-4. 10-23. 10-20. 10-97. 10-8. 10-7. 10-34. 10-50. 10-52. "
"Post 4, I'm out. Post 3. En route. On scene. In route. "
"Copy. Negative. Stand by. Be advised. Go ahead. "
"Units responding. Dispatch. Talkgroup. "
"Engine. Ladder. Medic. Rescue. Car. Unit. "
"MVA. MVC. Structure fire. Working fire."
)
async def transcribe_call(
call_id: str,
gcs_uri: str,
talkgroup_name: Optional[str] = None,
system_id: Optional[str] = None,
) -> tuple[Optional[str], list[dict]]:
"""
Transcribe audio at the given GCS URI and store the result in Firestore.
Returns:
(transcript, segments) — segments is a list of {start, end, text} dicts,
one per detected transmission. Empty list if transcription failed.
"""
if not gcs_uri or not gcs_uri.startswith("gs://"):
return None, []
# Load vocabulary for this system (empty list if none yet)
vocabulary: list[str] = []
if system_id:
from app.internal.vocabulary_learner import get_vocabulary
vocab_data = await get_vocabulary(system_id)
vocabulary = vocab_data.get("vocabulary") or []
try:
transcript, segments = await asyncio.to_thread(
_sync_transcribe, gcs_uri, talkgroup_name, vocabulary
)
except Exception as e:
logger.warning(f"Transcription failed for call {call_id}: {e}")
return None, []
if transcript:
updates: dict = {"transcript": transcript}
if segments:
updates["segments"] = segments
try:
await fstore.doc_set("calls", call_id, updates)
logger.info(
f"Transcript saved for call {call_id} "
f"({len(transcript)} chars, {len(segments)} segment(s))"
)
except Exception as e:
logger.warning(f"Could not save transcript for {call_id}: {e}")
return transcript, segments
def _sync_transcribe(
gcs_uri: str,
talkgroup_name: Optional[str] = None,
vocabulary: Optional[list[str]] = None,
) -> tuple[Optional[str], list[dict]]:
"""Download audio from GCS and transcribe with OpenAI Whisper."""
from google.cloud import storage as gcs
from google.oauth2 import service_account
from openai import OpenAI
from app.config import settings
if not settings.openai_api_key:
logger.warning("OPENAI_API_KEY not set — transcription disabled.")
return None
without_scheme = gcs_uri[len("gs://"):]
bucket_name, blob_path = without_scheme.split("/", 1)
if settings.gcp_credentials_path:
creds = service_account.Credentials.from_service_account_file(
settings.gcp_credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
gcs_client = gcs.Client(credentials=creds)
else:
gcs_client = gcs.Client()
bucket = gcs_client.bucket(bucket_name)
blob = bucket.blob(blob_path)
suffix = os.path.splitext(blob_path)[1] or ".mp3"
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp_path = tmp.name
try:
blob.download_to_filename(tmp_path)
from app.internal.vocabulary_learner import build_whisper_vocab_prompt
vocab_prefix = build_whisper_vocab_prompt(vocabulary or [])
tg_prefix = f"Talkgroup: {talkgroup_name}. " if talkgroup_name else ""
prompt = tg_prefix + vocab_prefix + _WHISPER_PROMPT
openai_client = OpenAI(api_key=settings.openai_api_key)
with open(tmp_path, "rb") as f:
response = openai_client.audio.transcriptions.create(
model="whisper-1",
file=f,
language="en",
prompt=prompt,
response_format="verbose_json",
temperature=0,
)
text = response.text.strip() or None
# Filter hallucinated segments. Two sources of hallucination in P25 recordings:
#
# 1. Trailing silence / static — Whisper fills silence past real content with
# sequential radio codes (10-4, 10-5...). Clamped by audio duration.
#
# 2. Leading silence — OP25 recordings typically have a short silence at the
# start before the first PTT press. Whisper sometimes hallucinates filler
# words or codes over this silence. Detected via no_speech_prob > 0.8
# (Whisper's own confidence that a segment contains no real speech).
audio_duration: float = getattr(response, "duration", None) or float("inf")
segments = [
{"start": round(s.start, 2), "end": round(s.end, 2), "text": s.text.strip()}
for s in (response.segments or [])
if s.text.strip()
and s.start < audio_duration
and getattr(s, "no_speech_prob", 0.0) < 0.8
]
return text, segments
finally:
try:
os.unlink(tmp_path)
except OSError:
pass