当前位置: 首页 > news >正文

使用python接入腾讯云DeepSeek

本文主要从提供SSE方式接入DeepSeek,并通过fastapi websocket对外提供接入方法。
参考文档:
腾讯云大模型:https://cloud.tencent.com/document/product/1759/109380
fastAPI官网:https://fastapi.tiangolo.com/

WebSocketManager

提供WebsocketManager对websocket连接实例进行管理,代码片段如下:

from fastapi import WebSocketclass WsManager:"""websocket manager"""connectors: dict[str, WebSocket] = {}@staticmethoddef get_connector(connector_id: str) -> WebSocket | None:"""获取连接实例:param connector_id::return:"""connector = WsManager.connectors.get(connector_id)return connector@staticmethoddef add_connector(connector_id: str, connector: WebSocket):"""添加websocket客户端:param connector_id: 客户端ID:param connector: 客户端连接实例:return:"""WsManager.connectors[connector_id] = connector@staticmethoddef remove_connector(connector_id: str):"""移除websocket连接:param connector_id: 连接ID:return:"""try:del WsManager.connectors[connector_id]except Exception:pass@staticmethodasync def send_message(connector_id: str, message: str):connector = WsManager.connectors.get(connector_id)if connector is None:returntry:await connector.send_text(message)except Exception as e:print('消息发送失败')@staticmethodasync def send_message_json(connector_id: str, message: dict):connector = WsManager.connectors.get(connector_id)if connector is None:returntry:await connector.send_json(message)except Exception as e:print('消息发送失败')

接入DeepSeek

import uuid
import os
import requests
import jsonfrom src.core.WsManager import WsManager
from .session import get_sessionTCLOUD_URL = 'lke.tencentcloudapi.com'
TCLOUD_DeepSeek_SSE_URL = "https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse"SECRET_ID = "XXXXXXXXXXXXXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXXX"
REGION = "ap-guangzhou"
TYPE = 5def get_session():"""生成一个 UUID:return session id 字符串"""new_uuid = uuid.uuid4()# 将 UUID 转换为字符串session_id = str(new_uuid)return session_iddef _resolve_message(message: str, prev_message: str):"""处理消息:param message::param prev_message::return:"""if prev_message == '':return Noneif prev_message.startswith("event:"):# 生成消息message = message.replace("data:", '').strip()try:message_json = json.loads(message)message_type = message_json.get('type')payload = message_json.get('payload')if message_type == 'token_stat':# token统计事件return Noneelif message_type == 'thought':# 思考事件return Noneelif message_type == 'error':# 错误事件return Noneelif message_type == 'reference':# 参考来源事件return Nonecontent = payload.get('content')record_id = payload.get('record_id')request_id = payload.get('request_id')session_id = payload.get('session_id')trace_id = payload.get('trace_id')message_id = payload.get('message_id')return {'type': message_type,'data': {'content': content,'record_id': record_id,'request_id': request_id,'session_id': session_id,'trace_id': trace_id,'message_id': message_id,}}except Exception as e:print(e)else:return Noneasync def get_q_a_result(visitor_id: str, question: str):"""获取问答结果:param visitor_id: 访客ID:param question: 问题内容:return: 回答内容"""session_id = get_session()req_data = {"content": f"{question}","bot_app_key": "XXXXXXXX",  # 可以获取方式见下图"visitor_biz_id": visitor_id,"session_id": session_id,"streaming_throttle": 100}res = requests.post(TCLOUD_DeepSeek_SSE_URL, data=json.dumps(req_data),stream=True, headers={"Accept": "text/event-stream"})prev_message: str = ''  # 上一条消息if res.status_code == 200:for line in res.iter_lines():if line:data = line.decode('utf-8')message = _resolve_message(data, prev_message)if message:await WsManager.send_message_json(visitor_id, message)prev_message = dataelse:print('Failed to get data. Status code: {response.status_code}')# 关闭websocket连接connector = WsManager.get_connector(visitor_id)if connector:await connector.close()WsManager.remove_connector(visitor_id)
  1. 进入控制台
    在这里插入图片描述2. 创建应用,并点击调用
    在这里插入图片描述

通过fastapi 以websocket方式对外提供接口

这个接口是,前台连接websocket后,后台直接根据连接参数,开始调用问答,问答结束后自动关闭websocket连接

import asynciofrom fastapi import APIRouter, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.responses import JSONResponsefrom src.core.WsManager import WsManager
from src.utils.tcloudLLM import get_example_recommend
from src.utils.session import get_visitor_idrouter = APIRouter(prefix="/api/tcloud", tags=['腾信云大模型接口'])@router.websocket("/ws/{compound_name}")
async def get_exercises_ws(compound_name: str, websocket: WebSocket, background_tasks: BackgroundTasks):await websocket.accept()visitor_id = get_visitor_id()WsManager.add_connector(visitor_id, websocket)# 本来使用BackgroundTask去做后台任务,结果不知道什么原因,调用不起来,然后换成asyncio处理asyncio.create_task(get_example_recommend(visitor_id, compound_name))  # 创建对应问答任务try:while True:data = await websocket.receive_text()await websocket.send_text(f"Message text was: {data}")except WebSocketDisconnect:WsManager.remove_connector(visitor_id)except Exception:WsManager.remove_connector(visitor_id)

如果需要进行多轮问答,提供为多次问答设置相关的 request_id 参数进行消息串联。

http://www.lryc.cn/news/542724.html

相关文章:

  • 【MySQL】服务正在启动或停止中,请稍候片刻后再试一次【解决方案】
  • 测试工程师玩转DeepSeek之Prompt
  • 【PyTorch】2024保姆级安装教程-Python-(CPU+GPU详细完整版)-
  • 精选案例展 | 智己汽车—全栈可观测驱动智能化运营与成本优化
  • MySQL 使用 `WHERE` 子句时 `COUNT(*)`、`COUNT(1)` 和 `COUNT(column)` 的区别解析
  • Linux运维——网络管理
  • STM32CUBEIDE FreeRTOS操作教程(十三):task api 任务访问函数
  • Jmeter+Jenkins接口压力测试持续集成
  • 深入浅出ES6:现代JavaScript的基石
  • 实现使用RBF(径向基函数)神经网络模拟二阶电机数学模型中的非线性干扰,以及使用WNN(小波神经网络)预测模型中的非线性函数来抵消迟滞影响的功能
  • 潜水泵,高效排水,守护城市与农田|深圳鼎跃
  • 易基因:RNA甲基化修饰和R-loop的交叉调控:从分子机制到临床意义|深度综述
  • 115 道 MySQL 面试题,从简单到深入!
  • 一周学会Flask3 Python Web开发-flask3上下文全局变量session,g和current_app
  • MFC学习笔记-1
  • Linux搜索查找类指令
  • 江协科技/江科大-51单片机入门教程——P[1-1] 课程简介P[1-2] 开发工具介绍及软件安装
  • 监听load和hashchange事件
  • 深度剖析Seata源码:解锁分布式事务处理的核心逻辑
  • 在 Ansys Mechanical 中解决干涉拟合
  • JMeter性能问题
  • 美国国防部(DoD)SysML v2迁移指南项目
  • JavaWeb-GenericServlet源码分析(适配器/模板方法)
  • 微机原理与汇编语言试题四
  • [java基础-JVM篇]1_JVM自动内存管理
  • 安宝特科技 | Vuzix Z100智能眼镜+AugmentOS:重新定义AI可穿戴设备的未来——从操作系统到硬件生态,如何掀起无感智能革命?
  • Unity FBXExport导出的FBX无法在Blender打开
  • UE5销毁Actor,移动Actor,简单的空气墙的制作
  • 【python】提取word\pdf格式内容到txt文件
  • 002简单MaterialApp主题和Scaffold脚手架