首页 / 技术博客 / 实时AI语音助手架构:从STT到TTS的端到端低延迟方案
部署教程 2026-06-21

实时AI语音助手架构:从STT到TTS的端到端低延迟方案

完整教程:构建低延迟AI语音助手,覆盖GPT-4o Realtime API、Whisper V4、Fish Speech等最新技术,含WebRTC集成和VAD实现。

实时AI语音助手架构:从STT到TTS的端到端低延迟方案

2026年,AI语音交互进入了真正的实时时代。GPT-4o Realtime API将端到端延迟压缩到300ms以内,开源方案如Fish Speech和Sesame CSM也达到了接近商用的水平。本文将手把手教你构建一个生产级的实时AI语音助手系统。

架构总览

实时AI语音助手端到端架构

👤 用户麦克风

WebRTC

VAD 语音活动检测 Silero V5

PCM

STT Whisper V4 流式转写

文本

LLM 流式生成 GPT-4o / Qwen3

文本

TTS Fish Speech 2.0 流式合成

🔊 扬声器播放 → 用户继续对话

传输层:WebRTC + WebSocket

WebRTC P2P音频传输 NAT穿透 · 低延迟

WebSocket 双向流式通信 文本+音频混合

gRPC Stream 服务间通信 高效序列化

音频缓冲管理 Ring Buffer · Jitter Buffer 自适应码率

⏱️ 延迟预算分配(目标 < 500ms 端到端)

VAD: ~20ms Silero V5

STT: ~100ms Whisper V4流式

LLM: ~200ms 首token延迟

TTS: ~120ms 首chunk延迟

总计: ~440ms 目标<500ms ✓

技术栈选型

方案一:GPT-4o Realtime API(最快上手)

GPT-4o Realtime API是目前最简单的实时语音方案。它将STT、LLM、TTS三个环节合为一体,通过WebSocket直接传输音频流。

优势:端到端延迟最低(约300ms),无需管理多个服务 劣势:成本较高($0.06/min音频),闭源

方案二:开源组合方案(灵活可控)

  • STT:Whisper V4 Large-v3-Turbo(流式转写)
  • LLM:Qwen3-8B / DeepSeek-R1-7B(流式生成)
  • TTS:Fish Speech 2.0 / Sesame CSM(流式合成)
  • VAD:Silero VAD v5

优势:成本低、可定制、可私有部署 劣势:架构复杂、需要更多工程工作

方案一实现:GPT-4o Realtime API

后端服务

# realtime_voice_server.py
import asyncio
import json
import websockets
from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect
import openai

app = FastAPI()

OPENAI_WS_URL = "wss://api.openai.com/v1/realtime"
MODEL = "gpt-4o-realtime-preview-2026-03"

@app.websocket("/ws/voice")
async def voice_proxy(client_ws: WebSocket):
    """代理客户端和OpenAI Realtime API之间的WebSocket连接"""
    await client_ws.accept()

    # 连接到OpenAI Realtime API
    headers = {
        "Authorization": f"Bearer {openai.api_key}",
        "OpenAI-Beta": "realtime=v2",
    }

    async with websockets.connect(
        f"{OPENAI_WS_URL}?model={MODEL}",
        extra_headers=headers,
    ) as openai_ws:
        # 配置会话
        await openai_ws.send(json.dumps({
            "type": "session.update",
            "session": {
                "modalities": ["text", "audio"],
                "voice": "alloy",
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "input_audio_transcription": {
                    "model": "whisper-v4-large",
                },
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.5,
                    "prefix_padding_ms": 300,
                    "silence_duration_ms": 500,
                },
                "instructions": """你是一个友好的AI语音助手。
                请用简洁的中文回答用户问题。
                回答要口语化,适合语音对话场景。""",
            },
        }))

        # 双向代理
        async def forward_to_openai():
            try:
                while True:
                    data = await client_ws.receive_text()
                    await openai_ws.send(data)
            except WebSocketDisconnect:
                pass

        async def forward_to_client():
            try:
                async for message in openai_ws:
                    await client_ws.send_text(message)
            except Exception:
                pass

        await asyncio.gather(
            forward_to_openai(),
            forward_to_client(),
        )

前端WebRTC集成

// voice_client.js
class RealtimeVoiceClient {
  constructor(wsUrl) {
    this.wsUrl = wsUrl;
    this.audioContext = null;
    this.mediaStream = null;
    this.ws = null;
  }

  async start() {
    // 获取麦克风权限
    this.mediaStream = await navigator.mediaDevices.getUserMedia({
      audio: {
        sampleRate: 24000,
        channelCount: 1,
        echoCancellation: true,
        noiseSuppression: true,
      }
    });

    // 创建AudioContext
    this.audioContext = new AudioContext({ sampleRate: 24000 });

    // 连接WebSocket
    this.ws = new WebSocket(this.wsUrl);

    this.ws.onopen = () => {
      console.log('Connected to voice server');
      this.startCapturing();
    };

    this.ws.onmessage = (event) => {
      const msg = JSON.parse(event.data);
      this.handleServerMessage(msg);
    };
  }

  startCapturing() {
    const source = this.audioContext.createMediaStreamSource(this.mediaStream);
    const processor = this.audioContext.createScriptProcessor(4096, 1, 1);

    processor.onaudioprocess = (e) => {
      const float32Data = e.inputBuffer.getChannelData(0);
      // 转换为PCM16
      const pcm16 = this.float32ToPCM16(float32Data);
      // Base64编码后发送
      const base64 = btoa(String.fromCharCode(...new Uint8Array(pcm16.buffer)));
      this.ws.send(JSON.stringify({
        type: 'input_audio_buffer.append',
        audio: base64,
      }));
    };

    source.connect(processor);
    processor.connect(this.audioContext.destination);
  }

  handleServerMessage(msg) {
    switch (msg.type) {
      case 'response.audio.delta':
        this.playAudioChunk(msg.delta);
        break;
      case 'response.audio_transcript.done':
        console.log('AI:', msg.transcript);
        break;
      case 'input_audio_buffer.speech_started':
        console.log('User started speaking');
        break;
      case 'input_audio_buffer.speech_stopped':
        console.log('User stopped speaking');
        break;
    }
  }

  async playAudioChunk(base64Audio) {
    const binaryString = atob(base64Audio);
    const bytes = new Uint8Array(binaryString.length);
    for (let i = 0; i < binaryString.length; i++) {
      bytes[i] = binaryString.charCodeAt(i);
    }

    const pcm16 = new Int16Array(bytes.buffer);
    const float32 = new Float32Array(pcm16.length);
    for (let i = 0; i < pcm16.length; i++) {
      float32[i] = pcm16[i] / 32768.0;
    }

    const audioBuffer = this.audioContext.createBuffer(1, float32.length, 24000);
    audioBuffer.getChannelData(0).set(float32);

    const source = this.audioContext.createBufferSource();
    source.buffer = audioBuffer;
    source.connect(this.audioContext.destination);
    source.start();
  }

  float32ToPCM16(float32Array) {
    const pcm16 = new Int16Array(float32Array.length);
    for (let i = 0; i < float32Array.length; i++) {
      const s = Math.max(-1, Math.min(1, float32Array[i]));
      pcm16[i] = s < 0 ? s * 0x8000 : s * 0x7FFF;
    }
    return pcm16;
  }
}

// 使用
const client = new RealtimeVoiceClient('ws://localhost:8000/ws/voice');
client.start();

方案二实现:开源组合方案

开源方案服务部署架构

🖥️ GPU服务器 (H100/A100)

Silero VAD v5 Port: 8001 CPU推理,~5ms 0.1 GPU VRAM

Whisper V4 Port: 8002 large-v3-turbo 1.5 GB VRAM

Qwen3-8B Port: 8003 (vLLM) 流式生成 8 GB VRAM

Fish Speech Port: 8004 v2.0 流式合成 2 GB VRAM

🎯 编排层 (FastAPI + WebSocket) 音频流调度 · 异步管线 · 错误恢复 · 会话管理 总VRAM需求: ~12 GB (单卡H100可部署)

Whisper V4流式转写服务

# stt_service.py
import asyncio
import numpy as np
from faster_whisper import WhisperModel
from fastapi import FastAPI, WebSocket
import torch

app = FastAPI()

# 加载Whisper V4 Turbo模型
model = WhisperModel(
    "large-v3-turbo",
    device="cuda",
    compute_type="float16",
)

class StreamingTranscriber:
    def __init__(self, model, chunk_duration=1.0):
        self.model = model
        self.chunk_duration = chunk_duration  # 秒
        self.buffer = np.array([], dtype=np.float32)
        self.sample_rate = 16000

    def add_audio(self, audio_data: np.ndarray):
        self.buffer = np.concatenate([self.buffer, audio_data])

    def transcribe_chunk(self) -> str:
        if len(self.buffer) < self.sample_rate * self.chunk_duration:
            return ""

        # 取出一个chunk进行转写
        chunk = self.buffer[:int(self.sample_rate * self.chunk_duration)]
        self.buffer = self.buffer[int(self.sample_rate * self.chunk_duration):]

        segments, _ = self.model.transcribe(
            chunk,
            language="zh",
            beam_size=3,
            vad_filter=True,
            vad_parameters=dict(
                min_silence_duration_ms=300,
                speech_pad_ms=200,
            ),
        )

        text = "".join([seg.text for seg in segments])
        return text.strip()

@app.websocket("/ws/stt")
async def stt_websocket(websocket: WebSocket):
    await websocket.accept()
    transcriber = StreamingTranscriber(model)

    try:
        while True:
            # 接收PCM16音频数据
            data = await websocket.receive_bytes()
            audio = np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0
            transcriber.add_audio(audio)

            text = transcriber.transcribe_chunk()
            if text:
                await websocket.send_json({
                    "type": "transcription",
                    "text": text,
                    "is_final": False,
                })
    except Exception:
        pass

Fish Speech流式TTS服务

# tts_service.py
import asyncio
from fish_speech_sdk import FishSpeechClient
from fastapi import FastAPI, WebSocket
import numpy as np

app = FastAPI()

tts_client = FishSpeechClient(
    model="fish-speech-2.0",
    device="cuda",
    sample_rate=44100,
)

@app.websocket("/ws/tts")
async def tts_websocket(websocket: WebSocket):
    await websocket.accept()

    # 设置参考音色(可选,支持声音克隆)
    ref_audio = None

    try:
        while True:
            data = await websocket.receive_json()

            if data["type"] == "set_voice":
                # 设置音色参考
                ref_audio = data["reference_audio"]
                continue

            if data["type"] == "synthesize":
                text = data["text"]

                # 流式合成,每个chunk约50ms
                async for chunk in tts_client.synthesize_stream(
                    text=text,
                    reference_audio=ref_audio,
                    chunk_size=2048,
                ):
                    await websocket.send_bytes(chunk.tobytes())

                # 发送结束标记
                await websocket.send_json({"type": "synthesis_done"})

    except Exception:
        pass

编排层:统一管线

# orchestrator.py
import asyncio
import json
from fastapi import FastAPI, WebSocket
import websockets
import numpy as np

app = FastAPI()

STT_URL = "ws://localhost:8001/ws/stt"
TTS_URL = "ws://localhost:8002/ws/tts"

class VoicePipeline:
    def __init__(self):
        self.stt_ws = None
        self.tts_ws = None
        self.llm_stream = None

    async def connect(self):
        self.stt_ws = await websockets.connect(STT_URL)
        self.tts_ws = await websockets.connect(TTS_URL)

    async def process_audio(self, audio_bytes: bytes) -> bytes:
        """处理一段音频输入,返回合成的语音"""

        # 1. STT: 音频转文本
        await self.stt_ws.send(audio_bytes)
        stt_result = json.loads(await self.stt_ws.recv())
        user_text = stt_result["text"]

        if not user_text:
            return b""

        print(f"用户说: {user_text}")

        # 2. LLM: 流式生成回复
        response_text = ""
        async with websockets.connect("ws://localhost:8003/v1/chat/completions") as llm_ws:
            await llm_ws.send(json.dumps({
                "model": "qwen3-8b",
                "messages": [
                    {"role": "system", "content": "用简短的口语化中文回答。"},
                    {"role": "user", "content": user_text},
                ],
                "stream": True,
                "max_tokens": 200,
            }))

            # 流式接收LLM输出
            sentence_buffer = ""
            async for message in llm_ws:
                chunk = json.loads(message)
                if chunk.get("choices"):
                    delta = chunk["choices"][0].get("delta", {})
                    if "content" in delta:
                        response_text += delta["content"]
                        sentence_buffer += delta["content"]

                        # 按句子送入TTS(减少首包延迟)
                        if any(p in sentence_buffer for p in "。!?\n"):
                            await self.tts_ws.send(json.dumps({
                                "type": "synthesize",
                                "text": sentence_buffer,
                            }))
                            sentence_buffer = ""

            # 处理剩余文本
            if sentence_buffer:
                await self.tts_ws.send(json.dumps({
                    "type": "synthesize",
                    "text": sentence_buffer,
                }))

        print(f"AI回复: {response_text}")

        # 3. 收集TTS音频
        audio_chunks = []
        while True:
            result = await asyncio.wait_for(self.tts_ws.recv(), timeout=5.0)
            try:
                msg = json.loads(result)
                if msg.get("type") == "synthesis_done":
                    break
            except json.JSONDecodeError:
                audio_chunks.append(result)

        return b"".join(audio_chunks)

    async def close(self):
        if self.stt_ws:
            await self.stt_ws.close()
        if self.tts_ws:
            await self.tts_ws.close()

VAD(语音活动检测)详解

VAD是实时语音系统中最容易被忽视但至关重要的组件。它决定了系统何时开始收听、何时停止。

# vad_service.py
import torch
import numpy as np

class SileroVAD:
    def __init__(self, threshold=0.5, sample_rate=16000):
        self.model, _ = torch.hub.load(
            repo_or_dir='snakers4/silero-vad',
            model='silero_vad',
            force_reload=False,
        )
        self.threshold = threshold
        self.sample_rate = sample_rate
        self.is_speaking = False
        self.silence_counter = 0
        self.silence_limit = int(0.5 * sample_rate / 512)  # 500ms沉默

    def detect(self, audio_chunk: np.ndarray) -> dict:
        """检测语音活动"""
        audio_tensor = torch.from_numpy(audio_chunk).float()

        # 获取语音概率
        speech_prob = self.model(audio_tensor, self.sample_rate).item()

        result = {
            "speech_probability": speech_prob,
            "is_speech": speech_prob > self.threshold,
            "speech_start": False,
            "speech_end": False,
        }

        if result["is_speech"]:
            if not self.is_speaking:
                result["speech_start"] = True
                self.is_speaking = True
            self.silence_counter = 0
        else:
            if self.is_speaking:
                self.silence_counter += 1
                if self.silence_counter >= self.silence_limit:
                    result["speech_end"] = True
                    self.is_speaking = False
                    self.silence_counter = 0

        return result

性能优化建议

低延迟优化清单

🔧 技术优化

使用PCM16 16kHz格式(避免编解码开销)

启用Whisper的VAD预过滤(减少无效推理)

LLM流式输出+TTS流式输入(流水线并行)

按句子切分送入TTS(而非等完整回复)

音频Ring Buffer避免内存拷贝

GPU显存预分配,避免动态分配延迟

使用CUDA Graph减少kernel启动开销

STT/TTS使用TensorRT加速推理

🌐 网络优化

WebRTC代替WebSocket(P2P直连减少中转)

就近部署(边缘节点<50ms RTT)

Opus音频编码(低比特率高质量)

Jitter Buffer自适应(平衡延迟和抖动)

连接预热(首次请求保持WebSocket长连接)

TCP_NODELAY + SO_KEEPALIVE(减少小包延迟)

QUIC协议支持(0-RTT重连)

音频流压缩(Opus @ 24kbps足够)

部署Docker Compose配置

# docker-compose.yml
version: "3.9"

services:
  orchestrator:
    build: ./orchestrator
    ports:
      - "8000:8000"
    depends_on:
      - stt
      - tts
      - llm
    environment:
      - STT_URL=ws://stt:8001/ws/stt
      - TTS_URL=ws://tts:8002/ws/tts
      - LLM_URL=ws://llm:8003/v1/chat/completions

  stt:
    build: ./stt
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
    volumes:
      - whisper-cache:/root/.cache

  tts:
    build: ./tts
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

  llm:
    image: vllm/vllm-openai:v0.8.5
    command: >
      --model Qwen/Qwen3-8B
      --max-model-len 4096
      --gpu-memory-utilization 0.9
      --enable-streaming
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

volumes:
  whisper-cache:

成本对比

  • GPT-4o Realtime API:$0.06/min × 1000用户 × 10min/天 = $600/天
  • 开源方案(自部署H100):约$2/GPU小时,单卡支撑~100并发 = $48/天
  • 混合方案:STT+TTS自部署 + GPT-4o API = 约$200/天

总结

实时AI语音助手在2026年已经从技术demo走向了生产可用。GPT-4o Realtime API适合快速验证和小规模部署,而开源组合方案适合大规模、低成本的生产环境。关键优化点在于:VAD的准确性、流式管线的并行化、以及网络传输的优化。

通过本文提供的架构和代码,你可以在单张H100上搭建一个支撑100并发用户的实时语音助手系统,端到端延迟控制在500ms以内。


本文由51domino.com团队撰写,完整代码已开源在GitHub。部署过程中遇到问题欢迎联系我们。

订阅更新

获取最新的AI本地化技术文章和教程