Speech-to-Text Pipeline Intermediate
The ASR (Automatic Speech Recognition) stage converts raw audio into text. Getting this right is critical — every downstream error (wrong intent, bad response) usually traces back to a bad transcript. This lesson covers production ASR providers, streaming transcription, noise handling, speaker diarization, and custom vocabulary tuning with working code.
ASR Provider Comparison
| Provider | Model | Streaming | Latency | WER (Clean) | Cost per Hour | Best For |
|---|---|---|---|---|---|---|
| Deepgram | Nova-2 | Yes | ~200ms | ~8% | $0.0043 | Real-time voice apps, lowest latency |
| OpenAI | Whisper large-v3 | No (batch) | ~2-5s | ~5% | $0.006 | Best accuracy, 100+ languages |
| Chirp 2 | Yes | ~300ms | ~7% | $0.016 | Multi-language, enterprise compliance | |
| AWS | Transcribe | Yes | ~400ms | ~9% | $0.024 | AWS-native workloads |
| Azure | Speech-to-Text v3.2 | Yes | ~350ms | ~7% | $0.016 | Enterprise, custom models, Teams integration |
| Self-hosted Whisper | whisper-large-v3-turbo | Partial | ~500ms* | ~5% | GPU cost only | Data privacy, no external API calls |
Streaming ASR with Deepgram (Production Code)
import asyncio
import json
import websockets
from typing import AsyncIterator, Optional
from dataclasses import dataclass
@dataclass
class TranscriptResult:
text: str
is_final: bool
confidence: float
start_time: float
end_time: float
words: list # Word-level timestamps
speaker: Optional[int] = None # Speaker ID if diarization enabled
class DeepgramStreamingASR:
"""Production streaming ASR client using Deepgram Nova-2."""
WS_URL = "wss://api.deepgram.com/v1/listen"
def __init__(self, api_key: str, sample_rate: int = 16000,
language: str = "en-US", model: str = "nova-2"):
self.api_key = api_key
self.sample_rate = sample_rate
self.language = language
self.model = model
self.ws = None
def _build_url(self, diarize: bool = False,
custom_vocabulary: list = None) -> str:
"""Build WebSocket URL with query parameters."""
params = [
f"model={self.model}",
f"language={self.language}",
f"sample_rate={self.sample_rate}",
"encoding=linear16",
"channels=1",
"interim_results=true", # Get partial transcripts
"utterance_end_ms=1000", # Silence = end of utterance
"vad_events=true", # Voice activity detection events
"smart_format=true", # Auto-punctuation and formatting
"endpointing=300", # 300ms silence = end of speech
]
if diarize:
params.append("diarize=true")
if custom_vocabulary:
# Boost recognition of specific terms
keywords = "&".join([f"keywords={w}" for w in custom_vocabulary])
params.append(keywords)
return f"{self.WS_URL}?{'&'.join(params)}"
async def connect(self, diarize: bool = False,
custom_vocabulary: list = None):
"""Establish WebSocket connection to Deepgram."""
url = self._build_url(diarize, custom_vocabulary)
self.ws = await websockets.connect(
url,
extra_headers={"Authorization": f"Token {self.api_key}"},
ping_interval=20,
ping_timeout=10
)
async def stream_and_transcribe(
self, audio_source: AsyncIterator[bytes]
) -> AsyncIterator[TranscriptResult]:
"""Send audio chunks and yield transcript results.
This is the core streaming loop used in production voice pipelines.
Audio chunks arrive every 100ms (1600 bytes at 16kHz 16-bit mono).
"""
if not self.ws:
raise RuntimeError("Call connect() first")
# Start sender task (sends audio chunks to Deepgram)
async def send_audio():
async for chunk in audio_source:
await self.ws.send(chunk)
# Signal end of audio
await self.ws.send(json.dumps({"type": "CloseStream"}))
sender = asyncio.create_task(send_audio())
# Receive transcripts as they arrive
try:
async for message in self.ws:
data = json.loads(message)
if data.get("type") == "Results":
channel = data["channel"]
alt = channel["alternatives"][0]
if alt["transcript"]:
words = [
{
"word": w["word"],
"start": w["start"],
"end": w["end"],
"confidence": w["confidence"],
"speaker": w.get("speaker")
}
for w in alt.get("words", [])
]
yield TranscriptResult(
text=alt["transcript"],
is_final=data["is_final"],
confidence=alt["confidence"],
start_time=data["start"],
end_time=data["start"] + data["duration"],
words=words,
speaker=words[0].get("speaker") if words else None
)
elif data.get("type") == "UtteranceEnd":
# Silence detected - good point to trigger NLU
pass
finally:
sender.cancel()
await self.ws.close()
# --- Usage Example ---
async def voice_assistant_loop():
"""Main loop for a streaming voice assistant."""
asr = DeepgramStreamingASR(
api_key="YOUR_DEEPGRAM_KEY",
sample_rate=16000,
language="en-US"
)
await asr.connect(
diarize=False,
custom_vocabulary=["Lilly Tech", "ASR", "Deepgram"]
)
# audio_stream would come from microphone/WebRTC/phone
audio_stream = get_audio_stream() # Your audio source
current_utterance = ""
async for result in asr.stream_and_transcribe(audio_stream):
if not result.is_final:
# Show partial transcript (like live captions)
print(f" [partial] {result.text}", end="\r")
else:
# Final transcript for this utterance
current_utterance = result.text
print(f" [final] {result.text} (confidence: {result.confidence:.2f})")
# Now send to NLU/Dialog pipeline
response = await process_voice_turn(current_utterance)
await speak_response(response)
Self-Hosted Whisper for Privacy-Sensitive Deployments
import torch
import numpy as np
from faster_whisper import WhisperModel
from typing import Optional
import time
class SelfHostedWhisperASR:
"""Self-hosted Whisper ASR for when audio cannot leave your infrastructure.
Uses faster-whisper (CTranslate2) for 4x speedup over original Whisper.
Runs on GPU or CPU. GPU recommended for real-time use.
"""
def __init__(self, model_size: str = "large-v3",
device: str = "cuda", compute_type: str = "float16"):
self.model = WhisperModel(
model_size,
device=device,
compute_type=compute_type # float16 on GPU, int8 on CPU
)
def transcribe(self, audio: np.ndarray, language: str = "en",
beam_size: int = 5) -> dict:
"""Transcribe audio array to text with word timestamps.
Args:
audio: NumPy float32 array, 16kHz mono
language: ISO language code
beam_size: Higher = more accurate but slower (1-10)
"""
t0 = time.monotonic()
segments, info = self.model.transcribe(
audio,
language=language,
beam_size=beam_size,
word_timestamps=True,
vad_filter=True, # Skip silence
vad_parameters={
"min_silence_duration_ms": 500,
"speech_pad_ms": 200
}
)
words = []
full_text = []
for segment in segments:
full_text.append(segment.text)
for word in segment.words:
words.append({
"word": word.word.strip(),
"start": word.start,
"end": word.end,
"probability": word.probability
})
latency_ms = (time.monotonic() - t0) * 1000
return {
"text": " ".join(full_text).strip(),
"language": info.language,
"language_probability": info.language_probability,
"words": words,
"latency_ms": latency_ms,
"audio_duration_s": len(audio) / 16000,
"rtf": latency_ms / (len(audio) / 16000 * 1000) # Real-time factor
}
def transcribe_file(self, file_path: str, **kwargs) -> dict:
"""Convenience method to transcribe from a file."""
return self.transcribe(file_path, **kwargs)
# --- GPU sizing guide ---
# Model Size | VRAM | RTF (GPU) | RTF (CPU) | Accuracy (WER)
# tiny | 1 GB | 0.02x | 0.3x | ~15%
# base | 1 GB | 0.03x | 0.5x | ~12%
# small | 2 GB | 0.05x | 1.0x | ~10%
# medium | 5 GB | 0.1x | 2.5x | ~7%
# large-v3 | 10 GB | 0.15x | 5.0x | ~5%
#
# RTF = Real-Time Factor. 0.1x means 10 seconds of audio takes 1 second to process.
# For real-time use, you need RTF < 0.3x (streaming with buffering).
Noise Handling and Audio Preprocessing
import numpy as np
from scipy import signal
class AudioPreprocessor:
"""Production audio preprocessing for voice AI pipelines.
Applied BEFORE sending audio to ASR to improve transcription quality.
"""
def __init__(self, sample_rate: int = 16000):
self.sample_rate = sample_rate
def normalize_volume(self, audio: np.ndarray,
target_db: float = -20.0) -> np.ndarray:
"""Normalize audio volume to target dB level.
Prevents too-quiet or too-loud audio from degrading ASR accuracy."""
rms = np.sqrt(np.mean(audio ** 2))
if rms == 0:
return audio
current_db = 20 * np.log10(rms)
gain = 10 ** ((target_db - current_db) / 20)
normalized = audio * gain
# Clip to prevent distortion
return np.clip(normalized, -1.0, 1.0)
def apply_highpass_filter(self, audio: np.ndarray,
cutoff_hz: int = 80) -> np.ndarray:
"""Remove low-frequency noise (HVAC hum, traffic rumble).
80Hz highpass is standard for voice - human speech rarely goes below 85Hz."""
nyquist = self.sample_rate / 2
b, a = signal.butter(4, cutoff_hz / nyquist, btype='high')
return signal.filtfilt(b, a, audio)
def detect_speech_segments(self, audio: np.ndarray,
frame_ms: int = 30,
threshold_db: float = -40.0) -> list:
"""Simple energy-based Voice Activity Detection (VAD).
Returns list of (start_sample, end_sample) tuples for speech segments."""
frame_size = int(self.sample_rate * frame_ms / 1000)
segments = []
in_speech = False
start = 0
for i in range(0, len(audio) - frame_size, frame_size):
frame = audio[i:i + frame_size]
rms = np.sqrt(np.mean(frame ** 2))
db = 20 * np.log10(rms + 1e-10)
if db > threshold_db and not in_speech:
start = i
in_speech = True
elif db <= threshold_db and in_speech:
# Add padding around speech segments
pad = int(self.sample_rate * 0.2) # 200ms padding
segments.append((max(0, start - pad), min(len(audio), i + pad)))
in_speech = False
if in_speech:
segments.append((max(0, start - int(self.sample_rate * 0.2)), len(audio)))
return segments
def preprocess(self, audio: np.ndarray) -> np.ndarray:
"""Full preprocessing pipeline for voice AI input."""
audio = self.apply_highpass_filter(audio)
audio = self.normalize_volume(audio)
return audio
Speaker Diarization
Speaker diarization answers "who spoke when" — critical for multi-speaker scenarios like meetings, call centers, and interview transcription.
# Speaker diarization with Deepgram (simplest production approach)
async def transcribe_with_speakers(audio_file: str, api_key: str) -> list:
"""Transcribe audio with speaker labels using Deepgram."""
from deepgram import Deepgram
import aiofiles
dg = Deepgram(api_key)
async with aiofiles.open(audio_file, "rb") as f:
audio_data = await f.read()
response = await dg.transcription.prerecorded(
{"buffer": audio_data, "mimetype": "audio/wav"},
{
"model": "nova-2",
"diarize": True,
"utterances": True,
"smart_format": True,
"punctuate": True
}
)
# Parse diarized results
turns = []
for utterance in response["results"]["utterances"]:
turns.append({
"speaker": utterance["speaker"],
"text": utterance["transcript"],
"start": utterance["start"],
"end": utterance["end"],
"confidence": utterance["confidence"]
})
return turns
# Output format:
# [
# {"speaker": 0, "text": "Hi, I'd like to check my account balance.", "start": 0.5, "end": 2.8},
# {"speaker": 1, "text": "Sure, can I have your account number?", "start": 3.1, "end": 4.9},
# {"speaker": 0, "text": "It's 4 5 6 7 8 9.", "start": 5.2, "end": 6.8},
# ]
Custom Vocabulary and Domain Adaptation
# Custom vocabulary boosts recognition of domain-specific terms
# This is critical for medical, legal, financial, and technical voice apps
class CustomVocabularyManager:
"""Manage custom vocabulary for ASR accuracy improvement."""
def __init__(self):
self.vocabularies = {}
def create_vocabulary(self, domain: str, terms: list,
boost_weight: float = 1.5):
"""Create a custom vocabulary for a specific domain.
Args:
domain: e.g., "medical", "financial", "tech_support"
terms: List of terms that should be recognized accurately
boost_weight: How much to boost these terms (1.0 = normal, 2.0 = strong)
"""
self.vocabularies[domain] = {
"terms": terms,
"boost_weight": boost_weight
}
def get_deepgram_keywords(self, domain: str) -> list:
"""Format vocabulary for Deepgram's keywords parameter."""
vocab = self.vocabularies.get(domain, {})
weight = vocab.get("boost_weight", 1.0)
return [f"{term}:{weight}" for term in vocab.get("terms", [])]
def get_google_phrases(self, domain: str) -> dict:
"""Format vocabulary for Google STT speech_contexts."""
vocab = self.vocabularies.get(domain, {})
return {
"phrases": vocab.get("terms", []),
"boost": vocab.get("boost_weight", 1.5) * 10 # Google uses 1-20 scale
}
# Domain-specific vocabulary examples
vocab_manager = CustomVocabularyManager()
vocab_manager.create_vocabulary("healthcare", [
"metformin", "lisinopril", "atorvastatin", "hypertension",
"hemoglobin A1C", "CBC", "MRI", "CT scan", "systolic",
"diastolic", "milligrams", "prescription refill"
], boost_weight=2.0)
vocab_manager.create_vocabulary("financial_services", [
"FICO score", "APR", "amortization", "escrow",
"Roth IRA", "401k", "CD rate", "wire transfer",
"ACH payment", "overdraft protection", "FDIC"
], boost_weight=1.8)
vocab_manager.create_vocabulary("tech_support", [
"Kubernetes", "Docker", "API gateway", "load balancer",
"SSL certificate", "DNS", "CIDR block", "VPC",
"Redis cluster", "PostgreSQL", "microservice"
], boost_weight=1.5)
Production Tip: Always log ASR confidence scores alongside transcripts. Set up alerts when average confidence drops below 0.85 — it usually means either audio quality degraded (new phone system, different microphone) or users are using vocabulary your model hasn't seen. Both are fixable, but only if you detect them early.
Lilly Tech Systems