FreeSwitch通过Websocket(流式双向语音)对接AI实时语音大模型技术方案(mod_ppy_aduio_stream)
FreeSwitch通过WebSocket对接AI实时语音大模型插件技术方案
1. 方案概述
基于FreeSWITCH的实时通信能力,通过WebSocket协议桥接AI大模型服务,实现低延迟、高并发的智能语音交互系统。支持双向语音流处理、实时ASR/TTS转换和动态业务指令执行。
1753095153158#pic_center)
有这么方面项目需要的可联系。https://cwn1.x3322.net:7777/down/0UgRahEtbPEa.so
类似技术参考:https://www.ddrj.com/callcenter/largemodel.html
2. 架构设计
graph LRA[FreeSWITCH] -->SIP/RTPB(WebSocket网关/SFU)B -->双向WebSocketC(AI Gateway)C -->HTTP/GRPC StreamD(大模型服务)D -->文本/控制指令CC -->TTS音频/指令BB -->RTP音频A
3. 核心组件
组件 | 技术选型 | 核心功能 |
---|---|---|
媒体网关 | FreeSWITCH 1.10+ | 处理SIP呼叫、RTP音频流、DTMF事件管理 |
协议桥接层 | mod_websocket (ESL+自定义模块) | 音频转WebSocket二进制流(支持OPUS/PCM) |
AI网关 | Node.js/Python (Tornado) | 双向WS通信、ASR/TTS调度、会话状态机管理 |
大模型接口 | GRPC Stream/HTTP2 Server | 流式对话处理&指令生成(200ms级响应) |
ASR/TTS引擎 | 阿里云/讯飞/DeepSeek RTS | 实时语音<=>文本转换(<300ms延迟) |
模型推理层 | DeepSeek-V2/GLM-4 API | 流式对话生成,支持SSML控制指令 |
4. 关键流程
4.1 语音输入流 (User → AI)
FreeSWITCH --(RTP)–> mod_websocket --(WS Binary/OPUS)–> AI网关 --(ASR API)–> 大模型
- 数据封装:
json
{
“call_id”: “call-123456”,
“seq”: 1024,
“is_final”: false,
“timestamp”: 1721541687000,
“payload”: “BASE64_OPUS”
}
4.2 AI响应流 (AI → User)
大模型 --(SSML指令)–> AI网关 --(WS控制消息)–> TTS服务 --(RTP)–> FreeSWITCH
- 中断响应机制:
- DTMF #键触发
barge-in
事件 - TTS首包到达时间<100ms
- DTMF #键触发
4.3 控制指令示例
json
// ASR识别结果
{“event”:“asr_result”, “text”:“查余额”, “confidence”:0.95}
// TTS响应指令
{“event”:“ai_response”, “type”:“tts”, “audio”:“chunk_123.opus”}
// 业务转移指令
{“event”:“action”, “command”:“transfer:6001”}
5. 性能优化
- 音频分片处理:80ms/帧(160采样@16kHz)
- 双缓冲ASR策略:预加载静音语音模型加速首字响应
- 动态抖动缓冲:网络延迟>150ms时自动补偿
- 会话热插拔:通话保持时维持AI对话上下文
- 熔断机制:模型响应>2s时转人工服务
6. 异常处理机制
故障场景 | 解决方案 |
---|---|
WebSocket断连 | 10秒自动重连+20秒音频缓存 |
ASR识别冲突 | 基于时间戳的序列仲裁 |
模型响应超时 | 播放「正在思考」提示音 |
DTMF中断事件 | 立即停止TTS并清空队列 |
编码格式不匹配 | OPUS/PCM/G.711动态切换 |
local cjson = require "dkjson"
local pts = require "ppytools"local ws_addr = "ws://127.0.0.1:20000"
ws_addr = "wss://127.0.0.1:12345"
--ws_addr = "wss://ai.xxx.com:12345"local records_base = "/workspace/records"local script_path = debug.getinfo(1, "S").source:sub(2)
local script_name = script_path:match("([^/\\]+)$") or "unknown"local fs_api = freeswitch.API()function fslog(msg, log_level)log_level = (log_level ~= nil) and log_level or "info" -- 严格判断nilfreeswitch.consoleLog(log_level, "[".. script_name .. "] "..msg)
endfunction main()local session_lega = sessionlocal session_lega_uuid = session_lega:get_uuid()fslog(string.format("[START][%s]\n", session_lega_uuid))session_lega:answer()local datetime_dir, records_dir = pts.create_compact_date_dir(records_base)local caller_id_number = session_lega:getVariable("caller_id_number")local destination_number = session_lega:getVariable("destination_number")fslog(string.format("session_lega_uuid: %s , caller_id_number: %s , destination_number: %s\n", session_lega_uuid, caller_id_number, destination_number))--后台通话录音if records_dir ~= nil then-- 启用双声道录音session_lega:setVariable("RECORD_STEREO", "true") local records_str = string.format("bgapi uuid_record %s start %s/%s.wav 1000 0 0", session_lega_uuid, records_dir, session_lega_uuid)fslog(records_str)fs_api:executeString(records_str) --CDR自定义变量session_lega:setVariable("record_file_uri_path", string.format("%s/%s.wav", datetime_dir, session_lega_uuid))end--缺省将用户语音数据通过二进制方式发送到AI服务器。--如果这个参数设置为true,则通过JSON格式发送。和AI服务器发给FS的JSON格式一致session_lega:setVariable("STREAM_MESSAGE_SENDJSON", "true")local con = freeswitch.EventConsumer()con:bind("CUSTOM", "mod_audio_stream::json")con:bind("CUSTOM", "mod_audio_stream::connect")con:bind("CUSTOM", "mod_audio_stream::disconnect")con:bind("CUSTOM", "mod_audio_stream::error")local start_time = os.date("%Y-%m-%d %H:%M:%S", os.time())local metadata_obj = {type = "init",sid = session_lega_uuid,phone_number = caller_id_number,timestamp = start_time}local metadata = cjson.encode(metadata_obj)fslog("metadata:" .. metadata)local result, err = fs_api:execute("uuid_audio_stream", string.format("%s start %s mono 8k %s", session_lega_uuid, ws_addr, metadata))if result thenfslog(string.format("Function executed successfully: %s\n", result), "notice")elsefslog(string.format("Error executing function: %s\n", err), "err")endwhile session_lega:ready() dolocal event = con:pop()if event thenlocal event_uuid = event:getHeader("Unique-ID")if event_uuid == session_lega_uuid thenlocal event_name = event:getHeader("Event-Name")local event_sub = event:getHeader("Event-Subclass")local body = event:getBody()fslog(string.format("JSON executing function, Event-Subclass: %s, body: %s\n", event_sub, body))if event_sub == "mod_audio_stream::connect" then--elseif event_sub == "mod_audio_stream::disconnect" thenbreakelseif event_sub == "mod_audio_stream::json" thenlocal data = cjson.decode(body)if data.type == "sentence" and data.status == "start" thenlocal metadata_obj = {type = "sentence_callback",sentence_id = data.sentence_id,status = "play",timestamp = os.date("%Y-%m-%d %H:%M:%S", os.time())}local metadata = cjson.encode(metadata_obj)fslog("[send_text]metadata:" .. metadata)fs_api:execute("uuid_audio_stream", string.format("%s send_text %s", session_lega_uuid, metadata))endif data.type == "streamText" thenif data.assistant thenfslog(data.assistant)endendif data.toHuman thenbreakelseif data.stop thenfslog("data stop", "err")elseif data.clear thenfslog("data clear", "err")endelseif event_sub == "mod_audio_stream::error" thenbreakelse--endendelseif session_lega thensession_lega:sleep(20)elsebreakendendend--fs_api:execute("uuid_record", string.format("%s stop", session_lega_uuid))fslog(string.format("[END][%s]\n", session_lega_uuid))
endmain()