实时AI语音助手架构:从STT到TTS的端到端低延迟方案
2026年,AI语音交互进入了真正的实时时代。GPT-4o Realtime API将端到端延迟压缩到300ms以内,开源方案如Fish Speech和Sesame CSM也达到了接近商用的水平。本文将手把手教你构建一个生产级的实时AI语音助手系统。
架构总览
技术栈选型
方案一:GPT-4o Realtime API(最快上手)
GPT-4o Realtime API是目前最简单的实时语音方案。它将STT、LLM、TTS三个环节合为一体,通过WebSocket直接传输音频流。
优势:端到端延迟最低(约300ms),无需管理多个服务 劣势:成本较高($0.06/min音频),闭源
方案二:开源组合方案(灵活可控)
- STT:Whisper V4 Large-v3-Turbo(流式转写)
- LLM:Qwen3-8B / DeepSeek-R1-7B(流式生成)
- TTS:Fish Speech 2.0 / Sesame CSM(流式合成)
- VAD:Silero VAD v5
优势:成本低、可定制、可私有部署 劣势:架构复杂、需要更多工程工作
方案一实现:GPT-4o Realtime API
后端服务
# realtime_voice_server.py
import asyncio
import json
import websockets
from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect
import openai
app = FastAPI()
OPENAI_WS_URL = "wss://api.openai.com/v1/realtime"
MODEL = "gpt-4o-realtime-preview-2026-03"
@app.websocket("/ws/voice")
async def voice_proxy(client_ws: WebSocket):
"""代理客户端和OpenAI Realtime API之间的WebSocket连接"""
await client_ws.accept()
# 连接到OpenAI Realtime API
headers = {
"Authorization": f"Bearer {openai.api_key}",
"OpenAI-Beta": "realtime=v2",
}
async with websockets.connect(
f"{OPENAI_WS_URL}?model={MODEL}",
extra_headers=headers,
) as openai_ws:
# 配置会话
await openai_ws.send(json.dumps({
"type": "session.update",
"session": {
"modalities": ["text", "audio"],
"voice": "alloy",
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {
"model": "whisper-v4-large",
},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500,
},
"instructions": """你是一个友好的AI语音助手。
请用简洁的中文回答用户问题。
回答要口语化,适合语音对话场景。""",
},
}))
# 双向代理
async def forward_to_openai():
try:
while True:
data = await client_ws.receive_text()
await openai_ws.send(data)
except WebSocketDisconnect:
pass
async def forward_to_client():
try:
async for message in openai_ws:
await client_ws.send_text(message)
except Exception:
pass
await asyncio.gather(
forward_to_openai(),
forward_to_client(),
)
前端WebRTC集成
// voice_client.js
class RealtimeVoiceClient {
constructor(wsUrl) {
this.wsUrl = wsUrl;
this.audioContext = null;
this.mediaStream = null;
this.ws = null;
}
async start() {
// 获取麦克风权限
this.mediaStream = await navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: 24000,
channelCount: 1,
echoCancellation: true,
noiseSuppression: true,
}
});
// 创建AudioContext
this.audioContext = new AudioContext({ sampleRate: 24000 });
// 连接WebSocket
this.ws = new WebSocket(this.wsUrl);
this.ws.onopen = () => {
console.log('Connected to voice server');
this.startCapturing();
};
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
this.handleServerMessage(msg);
};
}
startCapturing() {
const source = this.audioContext.createMediaStreamSource(this.mediaStream);
const processor = this.audioContext.createScriptProcessor(4096, 1, 1);
processor.onaudioprocess = (e) => {
const float32Data = e.inputBuffer.getChannelData(0);
// 转换为PCM16
const pcm16 = this.float32ToPCM16(float32Data);
// Base64编码后发送
const base64 = btoa(String.fromCharCode(...new Uint8Array(pcm16.buffer)));
this.ws.send(JSON.stringify({
type: 'input_audio_buffer.append',
audio: base64,
}));
};
source.connect(processor);
processor.connect(this.audioContext.destination);
}
handleServerMessage(msg) {
switch (msg.type) {
case 'response.audio.delta':
this.playAudioChunk(msg.delta);
break;
case 'response.audio_transcript.done':
console.log('AI:', msg.transcript);
break;
case 'input_audio_buffer.speech_started':
console.log('User started speaking');
break;
case 'input_audio_buffer.speech_stopped':
console.log('User stopped speaking');
break;
}
}
async playAudioChunk(base64Audio) {
const binaryString = atob(base64Audio);
const bytes = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
const pcm16 = new Int16Array(bytes.buffer);
const float32 = new Float32Array(pcm16.length);
for (let i = 0; i < pcm16.length; i++) {
float32[i] = pcm16[i] / 32768.0;
}
const audioBuffer = this.audioContext.createBuffer(1, float32.length, 24000);
audioBuffer.getChannelData(0).set(float32);
const source = this.audioContext.createBufferSource();
source.buffer = audioBuffer;
source.connect(this.audioContext.destination);
source.start();
}
float32ToPCM16(float32Array) {
const pcm16 = new Int16Array(float32Array.length);
for (let i = 0; i < float32Array.length; i++) {
const s = Math.max(-1, Math.min(1, float32Array[i]));
pcm16[i] = s < 0 ? s * 0x8000 : s * 0x7FFF;
}
return pcm16;
}
}
// 使用
const client = new RealtimeVoiceClient('ws://localhost:8000/ws/voice');
client.start();
方案二实现:开源组合方案
Whisper V4流式转写服务
# stt_service.py
import asyncio
import numpy as np
from faster_whisper import WhisperModel
from fastapi import FastAPI, WebSocket
import torch
app = FastAPI()
# 加载Whisper V4 Turbo模型
model = WhisperModel(
"large-v3-turbo",
device="cuda",
compute_type="float16",
)
class StreamingTranscriber:
def __init__(self, model, chunk_duration=1.0):
self.model = model
self.chunk_duration = chunk_duration # 秒
self.buffer = np.array([], dtype=np.float32)
self.sample_rate = 16000
def add_audio(self, audio_data: np.ndarray):
self.buffer = np.concatenate([self.buffer, audio_data])
def transcribe_chunk(self) -> str:
if len(self.buffer) < self.sample_rate * self.chunk_duration:
return ""
# 取出一个chunk进行转写
chunk = self.buffer[:int(self.sample_rate * self.chunk_duration)]
self.buffer = self.buffer[int(self.sample_rate * self.chunk_duration):]
segments, _ = self.model.transcribe(
chunk,
language="zh",
beam_size=3,
vad_filter=True,
vad_parameters=dict(
min_silence_duration_ms=300,
speech_pad_ms=200,
),
)
text = "".join([seg.text for seg in segments])
return text.strip()
@app.websocket("/ws/stt")
async def stt_websocket(websocket: WebSocket):
await websocket.accept()
transcriber = StreamingTranscriber(model)
try:
while True:
# 接收PCM16音频数据
data = await websocket.receive_bytes()
audio = np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0
transcriber.add_audio(audio)
text = transcriber.transcribe_chunk()
if text:
await websocket.send_json({
"type": "transcription",
"text": text,
"is_final": False,
})
except Exception:
pass
Fish Speech流式TTS服务
# tts_service.py
import asyncio
from fish_speech_sdk import FishSpeechClient
from fastapi import FastAPI, WebSocket
import numpy as np
app = FastAPI()
tts_client = FishSpeechClient(
model="fish-speech-2.0",
device="cuda",
sample_rate=44100,
)
@app.websocket("/ws/tts")
async def tts_websocket(websocket: WebSocket):
await websocket.accept()
# 设置参考音色(可选,支持声音克隆)
ref_audio = None
try:
while True:
data = await websocket.receive_json()
if data["type"] == "set_voice":
# 设置音色参考
ref_audio = data["reference_audio"]
continue
if data["type"] == "synthesize":
text = data["text"]
# 流式合成,每个chunk约50ms
async for chunk in tts_client.synthesize_stream(
text=text,
reference_audio=ref_audio,
chunk_size=2048,
):
await websocket.send_bytes(chunk.tobytes())
# 发送结束标记
await websocket.send_json({"type": "synthesis_done"})
except Exception:
pass
编排层:统一管线
# orchestrator.py
import asyncio
import json
from fastapi import FastAPI, WebSocket
import websockets
import numpy as np
app = FastAPI()
STT_URL = "ws://localhost:8001/ws/stt"
TTS_URL = "ws://localhost:8002/ws/tts"
class VoicePipeline:
def __init__(self):
self.stt_ws = None
self.tts_ws = None
self.llm_stream = None
async def connect(self):
self.stt_ws = await websockets.connect(STT_URL)
self.tts_ws = await websockets.connect(TTS_URL)
async def process_audio(self, audio_bytes: bytes) -> bytes:
"""处理一段音频输入,返回合成的语音"""
# 1. STT: 音频转文本
await self.stt_ws.send(audio_bytes)
stt_result = json.loads(await self.stt_ws.recv())
user_text = stt_result["text"]
if not user_text:
return b""
print(f"用户说: {user_text}")
# 2. LLM: 流式生成回复
response_text = ""
async with websockets.connect("ws://localhost:8003/v1/chat/completions") as llm_ws:
await llm_ws.send(json.dumps({
"model": "qwen3-8b",
"messages": [
{"role": "system", "content": "用简短的口语化中文回答。"},
{"role": "user", "content": user_text},
],
"stream": True,
"max_tokens": 200,
}))
# 流式接收LLM输出
sentence_buffer = ""
async for message in llm_ws:
chunk = json.loads(message)
if chunk.get("choices"):
delta = chunk["choices"][0].get("delta", {})
if "content" in delta:
response_text += delta["content"]
sentence_buffer += delta["content"]
# 按句子送入TTS(减少首包延迟)
if any(p in sentence_buffer for p in "。!?\n"):
await self.tts_ws.send(json.dumps({
"type": "synthesize",
"text": sentence_buffer,
}))
sentence_buffer = ""
# 处理剩余文本
if sentence_buffer:
await self.tts_ws.send(json.dumps({
"type": "synthesize",
"text": sentence_buffer,
}))
print(f"AI回复: {response_text}")
# 3. 收集TTS音频
audio_chunks = []
while True:
result = await asyncio.wait_for(self.tts_ws.recv(), timeout=5.0)
try:
msg = json.loads(result)
if msg.get("type") == "synthesis_done":
break
except json.JSONDecodeError:
audio_chunks.append(result)
return b"".join(audio_chunks)
async def close(self):
if self.stt_ws:
await self.stt_ws.close()
if self.tts_ws:
await self.tts_ws.close()
VAD(语音活动检测)详解
VAD是实时语音系统中最容易被忽视但至关重要的组件。它决定了系统何时开始收听、何时停止。
# vad_service.py
import torch
import numpy as np
class SileroVAD:
def __init__(self, threshold=0.5, sample_rate=16000):
self.model, _ = torch.hub.load(
repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False,
)
self.threshold = threshold
self.sample_rate = sample_rate
self.is_speaking = False
self.silence_counter = 0
self.silence_limit = int(0.5 * sample_rate / 512) # 500ms沉默
def detect(self, audio_chunk: np.ndarray) -> dict:
"""检测语音活动"""
audio_tensor = torch.from_numpy(audio_chunk).float()
# 获取语音概率
speech_prob = self.model(audio_tensor, self.sample_rate).item()
result = {
"speech_probability": speech_prob,
"is_speech": speech_prob > self.threshold,
"speech_start": False,
"speech_end": False,
}
if result["is_speech"]:
if not self.is_speaking:
result["speech_start"] = True
self.is_speaking = True
self.silence_counter = 0
else:
if self.is_speaking:
self.silence_counter += 1
if self.silence_counter >= self.silence_limit:
result["speech_end"] = True
self.is_speaking = False
self.silence_counter = 0
return result
性能优化建议
部署Docker Compose配置
# docker-compose.yml
version: "3.9"
services:
orchestrator:
build: ./orchestrator
ports:
- "8000:8000"
depends_on:
- stt
- tts
- llm
environment:
- STT_URL=ws://stt:8001/ws/stt
- TTS_URL=ws://tts:8002/ws/tts
- LLM_URL=ws://llm:8003/v1/chat/completions
stt:
build: ./stt
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
- whisper-cache:/root/.cache
tts:
build: ./tts
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
llm:
image: vllm/vllm-openai:v0.8.5
command: >
--model Qwen/Qwen3-8B
--max-model-len 4096
--gpu-memory-utilization 0.9
--enable-streaming
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
whisper-cache:
成本对比
- GPT-4o Realtime API:$0.06/min × 1000用户 × 10min/天 = $600/天
- 开源方案(自部署H100):约$2/GPU小时,单卡支撑~100并发 = $48/天
- 混合方案:STT+TTS自部署 + GPT-4o API = 约$200/天
总结
实时AI语音助手在2026年已经从技术demo走向了生产可用。GPT-4o Realtime API适合快速验证和小规模部署,而开源组合方案适合大规模、低成本的生产环境。关键优化点在于:VAD的准确性、流式管线的并行化、以及网络传输的优化。
通过本文提供的架构和代码,你可以在单张H100上搭建一个支撑100并发用户的实时语音助手系统,端到端延迟控制在500ms以内。
本文由51domino.com团队撰写,完整代码已开源在GitHub。部署过程中遇到问题欢迎联系我们。