Voice Dialog Management Intermediate
Voice dialog is fundamentally different from text chat. There is no screen to show options, no "typing..." indicator, and no way for the user to scroll back. Users cannot skim, undo, or copy-paste. Every interaction must be designed for the ear, not the eye. This lesson covers the critical voice-specific patterns: turn-taking, barge-in, silence handling, confirmation strategies, and graceful error recovery.
Turn-Taking Architecture
In human conversation, turn-taking happens naturally. In voice AI, you must explicitly manage when the system listens, when it speaks, and how to handle overlaps. Getting this wrong creates the most frustrating user experiences.
from enum import Enum
from dataclasses import dataclass, field
import asyncio
import time
from typing import Optional, Callable
class TurnState(Enum):
LISTENING = "listening" # System is capturing user speech
PROCESSING = "processing" # System is generating response
SPEAKING = "speaking" # System is playing TTS audio
WAITING_FOR_INPUT = "waiting" # System finished speaking, waiting for user
BARGE_IN = "barge_in" # User interrupted system speech
@dataclass
class TurnConfig:
"""Configuration for turn-taking behavior."""
silence_timeout_ms: int = 1500 # How long to wait for user to start
endpointing_ms: int = 500 # Silence duration = end of utterance
max_listen_duration_ms: int = 30000 # Max recording time per turn
barge_in_enabled: bool = True # Allow user to interrupt system
barge_in_threshold_ms: int = 200 # Min speech duration to trigger barge-in
processing_timeout_ms: int = 10000 # Max time for pipeline processing
filler_audio_enabled: bool = True # Play "hmm" while processing
max_no_input_retries: int = 2 # Retry count before escalation
class VoiceTurnManager:
"""Manages conversational turn-taking for voice AI systems.
This is the state machine at the heart of every voice assistant.
It coordinates when to listen, process, speak, and handle interruptions.
"""
def __init__(self, config: TurnConfig = None):
self.config = config or TurnConfig()
self.state = TurnState.WAITING_FOR_INPUT
self.no_input_count = 0
self.turn_count = 0
async def run_turn(self, audio_input, pipeline, audio_output) -> dict:
"""Execute one complete conversation turn.
Returns turn metadata for logging and analytics.
"""
turn_start = time.monotonic()
self.turn_count += 1
turn_data = {"turn": self.turn_count, "events": []}
# Phase 1: Listen for user speech
self.state = TurnState.LISTENING
turn_data["events"].append(("listening", time.monotonic() - turn_start))
transcript = await self._listen_with_timeout(audio_input)
if not transcript:
# No input detected
self.no_input_count += 1
if self.no_input_count >= self.config.max_no_input_retries:
turn_data["outcome"] = "no_input_escalate"
return turn_data
turn_data["outcome"] = "no_input_retry"
return turn_data
self.no_input_count = 0 # Reset on successful input
# Phase 2: Process (ASR already done, now NLU + Dialog)
self.state = TurnState.PROCESSING
turn_data["events"].append(("processing", time.monotonic() - turn_start))
turn_data["user_transcript"] = transcript
# Play filler audio while processing ("Let me check on that...")
if self.config.filler_audio_enabled:
filler_task = asyncio.create_task(
audio_output.play_filler()
)
response = await asyncio.wait_for(
pipeline.process(transcript),
timeout=self.config.processing_timeout_ms / 1000
)
if self.config.filler_audio_enabled:
filler_task.cancel()
# Phase 3: Speak response
self.state = TurnState.SPEAKING
turn_data["events"].append(("speaking", time.monotonic() - turn_start))
turn_data["response_text"] = response.text
interrupted = await self._speak_with_barge_in(
response.audio, audio_input, audio_output
)
if interrupted:
turn_data["events"].append(("barge_in", time.monotonic() - turn_start))
self.state = TurnState.BARGE_IN
# Phase 4: Back to waiting
self.state = TurnState.WAITING_FOR_INPUT
turn_data["total_ms"] = (time.monotonic() - turn_start) * 1000
turn_data["outcome"] = "barge_in" if interrupted else "complete"
return turn_data
async def _listen_with_timeout(self, audio_input) -> Optional[str]:
"""Listen for user speech with silence timeout."""
try:
transcript = await asyncio.wait_for(
audio_input.get_utterance(
endpointing_ms=self.config.endpointing_ms
),
timeout=self.config.silence_timeout_ms / 1000
)
return transcript
except asyncio.TimeoutError:
return None
async def _speak_with_barge_in(self, audio, audio_input,
audio_output) -> bool:
"""Play TTS audio while monitoring for user interruption."""
if not self.config.barge_in_enabled:
await audio_output.play(audio)
return False
# Play audio and listen simultaneously
play_task = asyncio.create_task(audio_output.play(audio))
listen_task = asyncio.create_task(
audio_input.detect_speech(
min_duration_ms=self.config.barge_in_threshold_ms
)
)
done, pending = await asyncio.wait(
[play_task, listen_task],
return_when=asyncio.FIRST_COMPLETED
)
if listen_task in done:
# User interrupted - stop speaking immediately
play_task.cancel()
await audio_output.stop()
return True
else:
# Finished speaking normally
listen_task.cancel()
return False
Silence Detection and Endpointing
Deciding when the user has finished speaking is one of the hardest problems in voice AI. Too aggressive and you cut them off mid-sentence. Too passive and there are awkward pauses.
import numpy as np
from collections import deque
class EndpointDetector:
"""Detect when a user has finished speaking.
Uses a combination of energy-based VAD and timing heuristics.
In production, combine with ASR-level endpointing for best results.
"""
def __init__(self, sample_rate: int = 16000):
self.sample_rate = sample_rate
self.frame_ms = 30 # Analyze 30ms frames
self.frame_size = int(sample_rate * self.frame_ms / 1000)
# Adaptive thresholds
self.noise_floor_db = -50.0
self.speech_threshold_db = -35.0
self.noise_buffer = deque(maxlen=100) # Track ambient noise level
# State
self.silence_frames = 0
self.speech_frames = 0
self.is_speaking = False
def process_frame(self, audio_frame: np.ndarray) -> dict:
"""Process one audio frame and return endpoint decision.
Returns:
{
"is_speech": bool,
"should_endpoint": bool,
"silence_duration_ms": int,
"speech_duration_ms": int
}
"""
rms = np.sqrt(np.mean(audio_frame ** 2) + 1e-10)
db = 20 * np.log10(rms)
# Update noise floor (adaptive)
if not self.is_speaking:
self.noise_buffer.append(db)
if len(self.noise_buffer) > 10:
self.noise_floor_db = np.percentile(list(self.noise_buffer), 50)
self.speech_threshold_db = self.noise_floor_db + 15 # 15dB above noise
is_speech = db > self.speech_threshold_db
if is_speech:
self.speech_frames += 1
self.silence_frames = 0
if self.speech_frames >= 3: # ~90ms of speech to confirm
self.is_speaking = True
else:
if self.is_speaking:
self.silence_frames += 1
silence_ms = self.silence_frames * self.frame_ms
speech_ms = self.speech_frames * self.frame_ms
# Endpoint decision with context-aware thresholds
should_endpoint = False
if self.is_speaking:
if speech_ms < 1000:
# Short utterance - wait longer (might be "um", "uh")
should_endpoint = silence_ms >= 800
elif speech_ms < 5000:
# Normal utterance - standard timeout
should_endpoint = silence_ms >= 500
else:
# Long utterance - they're probably done
should_endpoint = silence_ms >= 300
if should_endpoint:
self._reset()
return {
"is_speech": is_speech,
"should_endpoint": should_endpoint,
"silence_duration_ms": silence_ms,
"speech_duration_ms": speech_ms,
"noise_floor_db": self.noise_floor_db
}
def _reset(self):
self.silence_frames = 0
self.speech_frames = 0
self.is_speaking = False
Confirmation Patterns for Voice
In voice, confirmation is critical because there is no visual feedback. Use different confirmation levels based on the action's impact and the system's confidence.
from enum import Enum
class ConfirmationLevel(Enum):
NONE = "none" # High confidence, low risk: just do it
IMPLICIT = "implicit" # Medium confidence: state what you did
EXPLICIT = "explicit" # Low confidence or high risk: ask yes/no
SPELL_OUT = "spell_out" # Critical data: read it back character by character
class VoiceConfirmationEngine:
"""Choose and execute the right confirmation pattern for each action."""
CONFIRMATION_RULES = {
# (confidence_threshold, risk_level) -> confirmation_level
("high", "low"): ConfirmationLevel.NONE,
("high", "medium"): ConfirmationLevel.IMPLICIT,
("high", "high"): ConfirmationLevel.EXPLICIT,
("medium", "low"): ConfirmationLevel.IMPLICIT,
("medium", "medium"): ConfirmationLevel.EXPLICIT,
("medium", "high"): ConfirmationLevel.EXPLICIT,
("low", "low"): ConfirmationLevel.EXPLICIT,
("low", "medium"): ConfirmationLevel.EXPLICIT,
("low", "high"): ConfirmationLevel.SPELL_OUT,
}
def determine_confirmation(self, confidence: float, risk: str,
action: str, entities: dict) -> dict:
"""Determine what confirmation pattern to use.
Args:
confidence: ASR + NLU confidence (0.0 - 1.0)
risk: "low", "medium", "high" - based on action type
action: What the system is about to do
entities: Extracted entities (account numbers, amounts, etc.)
"""
conf_level = "high" if confidence > 0.9 else "medium" if confidence > 0.7 else "low"
confirmation = self.CONFIRMATION_RULES.get(
(conf_level, risk), ConfirmationLevel.EXPLICIT
)
templates = {
ConfirmationLevel.NONE: {
"response": self._action_response(action, entities),
"needs_confirmation": False
},
ConfirmationLevel.IMPLICIT: {
"response": f"OK, I've {self._past_tense(action)} "
f"{self._describe_entities(entities)}. "
f"Is there anything else?",
"needs_confirmation": False
},
ConfirmationLevel.EXPLICIT: {
"response": f"Just to confirm, you'd like to {action} "
f"{self._describe_entities(entities)}. "
f"Is that correct?",
"needs_confirmation": True
},
ConfirmationLevel.SPELL_OUT: {
"response": f"I'll {action}. The details are: "
f"{self._spell_out_entities(entities)}. "
f"Should I go ahead?",
"needs_confirmation": True
}
}
return templates[confirmation]
def _describe_entities(self, entities: dict) -> str:
parts = []
for key, value in entities.items():
parts.append(f"{key.replace('_', ' ')}: {value}")
return ", ".join(parts)
def _spell_out_entities(self, entities: dict) -> str:
"""Spell out critical values for accuracy."""
parts = []
for key, value in entities.items():
spelled = " ".join(str(value))
parts.append(f"{key.replace('_', ' ')}: {spelled}")
return ". ".join(parts)
def _past_tense(self, action: str) -> str:
if action.endswith("e"):
return action + "d"
return action + "ed"
def _action_response(self, action: str, entities: dict) -> str:
return f"Done. {self._describe_entities(entities)}."
# --- Usage examples ---
engine = VoiceConfirmationEngine()
# Example 1: Weather query (low risk, high confidence)
# -> ConfirmationLevel.NONE: Just answer
result = engine.determine_confirmation(
confidence=0.95, risk="low",
action="check weather",
entities={"location": "San Francisco"}
)
# Response: "Done. location: San Francisco."
# Example 2: Transfer money (high risk, medium confidence)
# -> ConfirmationLevel.EXPLICIT: Ask for confirmation
result = engine.determine_confirmation(
confidence=0.82, risk="high",
action="transfer $500",
entities={"from_account": "checking", "to_account": "savings", "amount": "$500"}
)
# Response: "Just to confirm, you'd like to transfer $500
# from checking to savings. Is that correct?"
# Example 3: Account number entry (high risk, low confidence)
# -> ConfirmationLevel.SPELL_OUT: Read back character by character
result = engine.determine_confirmation(
confidence=0.65, risk="high",
action="look up account",
entities={"account_number": "4567890123"}
)
# Response: "I'll look up account. The details are:
# account number: 4 5 6 7 8 9 0 1 2 3. Should I go ahead?"
Error Recovery in Voice
Voice errors are more disruptive than text errors because users cannot re-read or scroll back. Design a graduated error recovery strategy.
class VoiceErrorRecovery:
"""Graduated error recovery for voice systems.
Level 1: Simple retry with different phrasing
Level 2: Offer alternatives or narrower options
Level 3: Escalate to different modality or human
"""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
self.error_counts = {} # session_id -> error count
def handle_error(self, session_id: str, error_type: str,
context: dict) -> dict:
"""Return appropriate recovery action based on error history."""
key = f"{session_id}:{error_type}"
self.error_counts[key] = self.error_counts.get(key, 0) + 1
count = self.error_counts[key]
if error_type == "no_speech_detected":
return self._handle_no_speech(count, context)
elif error_type == "low_confidence":
return self._handle_low_confidence(count, context)
elif error_type == "unknown_intent":
return self._handle_unknown_intent(count, context)
elif error_type == "api_failure":
return self._handle_api_failure(count, context)
def _handle_no_speech(self, attempt: int, context: dict) -> dict:
if attempt == 1:
return {
"response": "I didn't hear anything. Could you try again?",
"action": "retry",
"adjust": {"silence_timeout_ms": 3000} # Wait longer
}
elif attempt == 2:
return {
"response": "I'm still having trouble hearing you. "
"Please make sure your microphone is working "
"and try speaking a bit louder.",
"action": "retry",
"adjust": {"silence_timeout_ms": 5000}
}
else:
return {
"response": "I'm unable to hear you. "
"If you're on a phone, please press 0 to speak "
"with an agent. Otherwise, please try again later.",
"action": "escalate_or_end"
}
def _handle_low_confidence(self, attempt: int, context: dict) -> dict:
if attempt == 1:
return {
"response": f"I think you said '{context.get('transcript', '')}', "
f"but I'm not sure. Could you say that again?",
"action": "retry"
}
elif attempt == 2:
return {
"response": "I'm having trouble understanding. "
"Could you try saying it in a different way? "
"For example, you can say things like "
"'check my balance' or 'make a payment'.",
"action": "retry_with_hints"
}
else:
return {
"response": "Let me connect you with someone who can help.",
"action": "escalate"
}
def _handle_unknown_intent(self, attempt: int, context: dict) -> dict:
if attempt == 1:
return {
"response": "I can help you with checking balances, "
"making payments, or updating your account. "
"Which would you like?",
"action": "offer_menu"
}
elif attempt == 2:
return {
"response": "I'm sorry, I can only help with a few things. "
"Would you like to hear the full menu, "
"or speak with an agent?",
"action": "menu_or_escalate"
}
else:
return {
"response": "Let me transfer you to an agent who can help.",
"action": "escalate"
}
def _handle_api_failure(self, attempt: int, context: dict) -> dict:
return {
"response": "I'm experiencing a technical issue. "
"Please hold for a moment while I try again.",
"action": "retry_with_delay",
"delay_ms": min(1000 * attempt, 5000) # Exponential backoff
}
def reset(self, session_id: str):
"""Reset error counts for a session (call after successful turn)."""
keys_to_remove = [k for k in self.error_counts if k.startswith(session_id)]
for k in keys_to_remove:
del self.error_counts[k]
Lilly Tech Systems