当前位置: 首页 > news >正文

【知足常乐ai笔记】机器人强化学习

除了传统的按空格分割和SQL关键字分段策略,以下是一些更智能的SQL分段方法,结合时序语义、语义解析、执行优化等场景需求,可显著提升复杂查询的处理效率和准确性:

⏱️ 1. 时序语义分段策略(适用于时序数据库场景)

• 按时间区间分段

 使用固定时间窗口或滑动窗口分组数据,例如 IoTDB 的语法:  GROUP BY ([startTime, endTime), interval, sliding_step)  参数 interval 定义窗口长度,sliding_step 定义滑动步长,适合周期性数据分析(如每5分钟统计一次温度平均值)。  

• 按数据变化值分段

 根据字段值的动态变化划分段,例如当数值变化超过阈值时开启新分段:  GROUP BY VARIATION(control_expression, delta)  若 delta=10,则相邻数据差值超过10时分段,适用于监控设备状态突变场景。  

• 按会话间隔分段

 以时间间隔作为分段依据,例如用户行为分析中的会话超时切割:  GROUP BY SESSION(time_interval)  连续数据的时间间隔超过 time_interval 则分段,适合用户活跃会话分析。

🔍 2. 语义解析分段策略(基于查询意图的分段)

• 模式链接(Schema Linking)

 将自然语言问题中的实体映射到数据库表结构:  ◦ 识别查询中的表名、字段名、条件值(如“销售额”映射到 sales.amount);  ◦ 结合TF-IDF或深度学习模型建立语义映射,减少歧义。  

• 分层解码生成

 分阶段生成SQL框架:  1. 首先生成SQL主干(如 SELECT...FROM...WHERE);  2. 填充具体字段、聚合函数、连接条件;  3. 适用于Text2SQL系统,提升嵌套查询的生成准确率。  

⚙️ 3. 执行优化导向的分段策略

• 分句解析与逻辑隔离

 将SQL按子句拆分为独立模块(SELECT、FROM、WHERE等),明确每部分作用:  -- 拆解示例SELECT e.name, d.dept_name        -- 目标字段FROM employee e                   -- 主表JOIN department d ON e.dept_id=d.id -- 连接逻辑WHERE e.salary > 5000            -- 过滤条件结合执行顺序(FROM → JOIN → WHERE → GROUP BY → SELECT)优化分段逻辑。  

• 子查询剥离与CTE重构

 将嵌套子查询转换为临时表或公共表达式(CTE):  WITH high_budget_dept AS (   -- 独立子查询段SELECT id FROM department WHERE budget > 100000)SELECT e.name FROM employee e WHERE e.dept_id IN (SELECT id FROM high_budget_dept)  降低复杂度,提升可读性和执行效率。  

📊 4. 结果集动态分段策略

• 上下文关键字片段提取

 在全文搜索结果中,按关键词位置提取上下文片段:  ◦ 定位关键词后,截取前后40个字符作为分段(如 [...部署目录中zyplayer-doc.jar...]);  ◦ 合并重叠分段,避免重复。  

• 基于打分的分段重排序

 根据关键词密度、位置相关性对分段打分,优先展示高权重段落:  SELECT (IF(text LIKE '%部署%', 2, 0) + IF(text LIKE '%系统%', 1, 0)) AS scoreFROM documentsORDER BY score DESC提升结果可读性。  

🔧 5. 动态参数化分段策略

• 外部参数驱动分段

 将分段规则参数化,支持动态传入分段点:  -- 传入分段点列表 [0, 100, 200]SELECT pseg(x, [0, 100, 200]) AS segment, AVG(x) FROM data_table GROUP BY segment结合二分法(pseg函数)提升大范围分段效率。  

• 条件表达式分段

 按布尔条件划分数据,例如:  GROUP BY CONDITION(expression)  将满足 expression 的连续数据归为一段,适用于异常检测场景。  

💎 总结:按场景选择分段策略

场景 推荐策略 优势

时序数据分析 时间区间/会话间隔分段 利用时序连续性,高效聚合

自然语言转SQL 模式链接+分层解码 精准映射语义,减少歧义

复杂查询优化 子查询剥离+分句解析 降低嵌套复杂度,提升可维护性

全文检索结果展示 关键字片段提取+动态打分 聚焦高相关内容,提升用户体验

动态参数聚合 二分法分段+外部参数驱动 支持实时更新分段规则,计算高效

通过以上策略,可显著提升复杂 SQL 的可读性、执行效率及结果精准度。实际应用中需结合数据库类型(如时序库IoTDB、OLAP引擎)和业务需求灵活组合。
从你给出的错误信息可知,在训练过程里,程序因GPU内存不足而崩溃。错误类型为torch.OutOfMemoryError,这表明PyTorch无法在GPU上分配更多内存了。尽管你的GPU总容量有44.52 GiB,但实际空闲内存仅有12.94 MiB,这说明内存管理方面存在问题。

可能的原因

  1. 模型规模过大:Qwen3这类大型语言模型需要大量GPU内存来存储参数。
  2. 批处理大小不合适:过大的批处理大小会使内存需求急剧增加。
  3. 内存碎片问题:频繁地分配和释放内存可能会导致内存碎片化,即便总内存充足,也难以分配大块连续内存。
  4. 梯度累积不当:如果使用了梯度累积,可能参数设置不合理。
  5. 未释放中间变量:有些张量在使用完后没有及时释放。

解决建议

1. 减小批处理大小

要降低每次训练迭代的内存消耗,可尝试把批处理大小减半,甚至减得更小。

2. 采用梯度累积

梯度累积能让你在不增加内存使用量的情况下,获得等效于更大批处理大小的训练效果。你可以在训练配置里添加如下代码:

# 在训练循环中
optimizer.zero_grad()
for i, batch in enumerate(data_loader):outputs = model(**batch)loss = outputs.lossloss = loss / accumulation_steps  # 梯度累积步数loss.backward()if (i + 1) % accumulation_steps == 0:optimizer.step()optimizer.zero_grad()
3. 运用混合精度训练

混合精度训练借助半精度浮点数(FP16)来减少内存占用,同时不会对模型精度造成显著影响。

from torch.cuda.amp import autocast, GradScalerscaler = GradScaler()# 在训练循环中
for batch in data_loader:with autocast(device_type='cuda'):outputs = model(**batch)loss = outputs.lossoptimizer.zero_grad()scaler.scale(loss).backward()scaler.step(optimizer)scaler.update()
4. 释放不必要的张量

你可以使用del语句删除不再需要的变量,并调用torch.cuda.empty_cache()来释放缓存的内存。

del intermediate_variable
torch.cuda.empty_cache()
5. 设置环境变量

通过设置PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True,能够减少内存碎片化问题。

export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True
python 7t3-4-rl.py > 9.txt
6. 检查代码中的内存泄漏

要确保在训练循环中没有不断累积张量,特别是在使用自定义数据集或者数据处理流程时更要注意。

7. 尝试模型量化

对于大型模型,可以考虑使用8位或4位量化技术,例如LLM.int8()或者GPTQ。

实施建议

你可以先尝试减小批处理大小和启用混合精度训练,这两种方法实现起来较为简单,通常能有效缓解内存压力。要是问题依旧存在,再考虑梯度累积和环境变量设置等其他方法。要实现监控输出在垂直方向上压缩60%,可以通过减少输出行数、合并信息和调整终端高度来实现。以下是几种可行方案:

一、精简输出内容(减少行数)

通过过滤不必要的信息,将输出压缩到更少的行数:

watch -n 1 'tput cols 80; echo "CPU: $(top -bn1 | grep "Cpu(s)" | awk "{print \$2+\$4}")%"; echo "GPU: $(nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader,nounits)%"; free -h | grep Mem | awk "{print \"内存: \$3/\$2\"}"; mpstat -P ALL | grep -E "CPU|all" | awk "{print \$3,\$4,\$5,\$6,\$13}"'

输出效果示例(压缩到约6行):

CPU: 5.2%
GPU: 24%
内存: 7.2G/15.5G
CPU    %usr   %nice    %sys   %idle
all    3.2    0.0      2.0    94.8
0      2.1    0.0      1.2    96.7

二、使用垂直分屏工具(如tmux)

将终端分成多个面板,每个面板显示不同监控信息:

  1. 安装 tmux

    sudo apt install tmux  # Ubuntu/Debian
    
  2. 创建垂直分屏配置文件 monitor.tmux

    # 创建新会话
    new-session -d -s monitor# 分割窗口为上下两部分(60%:40%)
    split-window -v -p 40# 上半部分再分割为左右两部分
    select-pane -t 0
    split-window -h -p 50# 设置各面板命令
    send-keys -t 0 'mpstat -P ALL 1' C-m  # CPU监控
    send-keys -t 1 'nvidia-smi -l 1' C-m  # GPU监控
    send-keys -t 2 'watch -n 1 "free -h; echo; df -h /"' C-m  # 内存/磁盘监控# 切换到第一个面板
    select-pane -t 0# 启用鼠标操作
    set-option -g mouse on# 附加会话
    attach-session -t monitor
    
  3. 运行配置:

    tmux source-file monitor.tmux
    

三、使用自定义脚本合并输出

创建一个脚本将所有监控信息整合到更少的行中:

#!/bin/bash
while true; do# 清空屏幕clear# 获取CPU信息(合并为一行)CPU_INFO=$(mpstat -P ALL 1 1 | grep -E "CPU|all" | awk '{printf "%-5s %5s %5s %5s %5s\n", $3, $4, $5, $6, $13}')# 获取GPU信息GPU_INFO=$(nvidia-smi --query-gpu=utilization.gpu,memory.used,memory.total --format=csv,noheader)# 获取内存信息MEM_INFO=$(free -h | grep Mem | awk '{print $3"/"$2}')# 输出所有信息(紧凑格式)echo "===== CPU (usr/nice/sys/idle) ====="echo "$CPU_INFO"echoecho "===== GPU (使用率/显存) ====="echo "$GPU_INFO" | awk -F', ' '{printf "使用率: %-5s  显存: %-8s / %-8s\n", $1, $2, $3}'echoecho "===== 内存 ====="echo "已用/总量: $MEM_INFO"# 等待1秒sleep 1
done

保存为 monitor.sh,添加执行权限并运行:

chmod +x monitor.sh
./monitor.sh

四、调整终端字体和行高(GUI终端)

如果是图形界面终端,可以通过以下方式压缩显示:

  1. 减小字体大小Ctrl + -(大多数终端)
  2. 减少行高
    • 在终端设置中调整「行间距」或「字体行高」参数
    • 例如在GNOME终端中:「首选项」→「配置文件」→「文本」→ 取消勾选「等宽行高」

五、使用 column 命令格式化输出

将多行输出转换为多列显示,减少垂直空间占用:

watch -n 1 'tput cols 120; 
(mpstat -P ALL | grep -E "CPU|all"; echo; 
nvidia-smi --query-gpu=name,utilization.gpu,memory.used,memory.total --format=csv; echo; 
free -h | grep Mem) | column -x -s " "'

根据你的具体需求,可以选择最适合的压缩方式。如果需要更精确的控制,建议使用自定义脚本方案(方案三),它能将所有信息整合到最少的行数中。在机器人走路(步态控制)的强化学习训练中,核心目标是让机器人通过与环境的交互(试错)学习稳定、高效的步行策略,应对连续动作空间(关节角度、力矩等)和高维状态空间(传感器数据、身体姿态、速度等)的挑战。以下是适用于该任务的主流强化学习算法,按其核心思想和适用场景分类介绍:

一、基于 Actor-Critic 框架的深度强化学习算法

Actor-Critic 框架结合了“策略优化”(Actor,直接输出动作)和“价值评估”(Critic,评估动作价值),适合连续动作空间的机器人步态控制,是目前应用最广泛的一类算法。

1. DDPG(深度确定性策略梯度,Deep Deterministic Policy Gradient)
  • 核心思想
    针对连续动作空间设计,是 DQN(深度 Q 网络)与策略梯度方法的结合。

    • Actor 网络:输出确定性动作(如关节力矩的具体值),而非概率分布,适合需要精确控制的机器人步态。
    • Critic 网络:基于 Actor 输出的动作和当前状态,估计 Q 值(动作价值),指导 Actor 优化策略。
    • 引入“经验回放池”(存储过往状态-动作-奖励-下一状态数据)和“目标网络”(缓慢更新以提高稳定性),解决样本相关性和训练震荡问题。
  • 在机器人走路中的优势
    能直接输出连续动作(如关节角度、电机扭矩),适合双足/四足机器人的关节控制;对中等复杂度的步态(如平地行走)有较好的初始学习效果。

  • 局限性:容易出现 Q 值过估计和策略震荡,在复杂地形(如斜坡、障碍物)上稳定性较差。

2. TD3(双延迟深度确定性策略梯度,Twin Delayed DDPG)
  • 核心思想:DDPG 的改进版本,针对其缺陷优化:

    • 引入“双 Critic 网络”:通过两个独立的 Critic 网络估计 Q 值,取最小值作为目标值,缓解 Q 值过估计问题。
    • 延迟更新 Actor 网络:Critic 网络更新多次后再更新 Actor 网络,减少策略震荡。
    • 目标策略平滑:在目标动作中加入少量噪声,避免 Actor 过度拟合当前 Critic 的估计。
  • 在机器人走路中的优势
    相比 DDPG 稳定性显著提升,能更好地应对机器人动力学的不确定性(如地面摩擦变化、关节磨损),适合需要鲁棒性的步态训练(如崎岖路面行走)。

3. SAC(软 Actor-Critic,Soft Actor-Critic)
  • 核心思想:基于“最大熵强化学习”,在传统奖励之外引入“熵奖励”(鼓励策略探索多样性),使机器人在学习稳定步态的同时,保留应对未知环境的灵活性。

    • Actor 网络:输出动作的概率分布(而非确定性动作),通过最大化“预期奖励 + 熵”优化策略。
    • Critic 网络:估计“软 Q 值”(包含熵的价值),指导 Actor 平衡“利用”(稳定走路)和“探索”(尝试新步态)。
  • 在机器人走路中的优势
    探索能力强,适合机器人在多样环境(如平地、草地、台阶)中学习自适应步态;鲁棒性高,即使传感器或执行器有噪声,仍能维持基本步行能力。

4. PPO(近端策略优化,Proximal Policy Optimization)
  • 核心思想:策略梯度方法的简化与改进,通过限制策略更新的幅度(“近端约束”),在保证训练稳定的同时提高样本效率。

    • 不依赖 Actor-Critic 的严格分离,而是直接优化策略目标函数,通过剪辑(Clip)机制避免策略突变(如关节动作突然过大导致机器人摔倒)。
    • 实现简单,训练过程稳定,对超参数不敏感,适合工程落地。
  • 在机器人走路中的优势
    目前机器人步态控制中应用最广泛的算法之一(如波士顿动力的部分机器人、开源四足机器人 Unitree Go1 的强化学习训练)。适合复杂步态(如小跑、跳跃)的学习,能快速收敛到稳定策略。

二、基于模型的强化学习算法

基于模型的算法通过学习环境模型(如机器人动力学模型),在虚拟环境中模拟交互以生成训练数据,减少真实机器人的物理试错成本(尤其适合昂贵或易损坏的机器人)。

1. MBPO(基于模型的策略优化,Model-Based Policy Optimization)
  • 核心思想

    • 先学习一个“概率动力学模型”(预测机器人在某动作下的下一状态和奖励),用真实数据训练模型。
    • 基于该模型在虚拟环境中“想象”未来轨迹(生成模拟数据),再用这些数据训练无模型强化学习算法(如 PPO、SAC)优化策略。
  • 在机器人走路中的优势
    样本效率极高(真实交互数据需求减少 10-100 倍),适合双足机器人等训练成本高的场景;能通过模型预测避免危险动作(如摔倒)。

2. PETS(基于概率集成的轨迹采样,Probabilistic Ensemble Trajectory Sampling)
  • 核心思想
    用多个神经网络组成“集成动力学模型”(捕捉模型不确定性),通过采样不同模型的预测轨迹,选择最稳健的动作(如步态调整)。

  • 在机器人走路中的优势
    对模型误差的容忍度高,即使动力学模型不够精确(如忽略地面微小不平),仍能学习稳定步态,适合室外非结构化环境(如草地、碎石路)。

三、策略梯度方法(无模型,直接优化策略)

1. TRPO(信任区域策略优化,Trust Region Policy Optimization)
  • 核心思想:通过“信任区域”约束策略更新幅度(确保新策略与旧策略的差异在可控范围内),避免因策略突变导致机器人摔倒或奖励骤降。

    • 用 KL 散度(衡量两个概率分布的差异)定义信任区域,保证策略优化的单调性(奖励非递减)。
  • 在机器人走路中的优势
    稳定性强,适合需要严格控制动作变化的场景(如双足机器人的步态切换,避免失衡)。但实现复杂,计算成本高,逐渐被 PPO 替代。

2. A2C(优势演员-评论家,Advantage Actor-Critic)
  • 核心思想:简化的 Actor-Critic 框架,用“优势函数”(当前动作价值 - 状态平均价值)替代 Q 值,减少估计偏差。

    • 并行训练多个智能体(或同一智能体的多个副本),同步更新策略,提高样本效率。
  • 在机器人走路中的优势
    训练速度快,适合多足机器人(如四足、六足)的并行步态优化(不同腿的动作协调)。

四、结合模仿学习的强化学习算法

机器人走路的初始阶段可能因“从零开始试错”导致频繁摔倒,结合模仿学习(先用专家数据引导)可加速训练:

1. DAgger(数据集聚合,Dataset Aggregation)
  • 核心思想:先通过“行为克隆”(模仿专家步态数据,如人类控制机器人走路的示范)学习基础步态,再用强化学习优化策略;同时持续让专家修正智能体的错误动作,迭代更新数据集。

  • 在机器人走路中的优势
    减少初始阶段的无效试错(如机器人频繁摔倒),加速收敛到可行步态,适合双足机器人等平衡难度高的场景。

总结:不同算法的适用场景

算法核心优势适合的机器人走路场景
DDPG连续动作直接输出简单平地步态,低复杂度机器人(如小型双足)
TD3抗过估计,稳定性好复杂地形(斜坡、小障碍),中等复杂度机器人
PPO实现简单,稳定性强多足机器人(四足、六足),大规模并行训练
SAC探索性强,鲁棒性高非结构化环境(室外、未知地形)
MBPO/PETS样本效率高,减少真实试错昂贵/易损坏机器人(如人形机器人),室外场景
DAgger+RL加速初始学习,减少摔倒双足机器人(平衡难度高),需要专家示范的场景

这些算法中,PPOTD3 因稳定性和实现难度平衡,是目前机器人步态训练的主流选择;SACMBPO 则在鲁棒性和样本效率上更具优势,适合复杂环境下的应用。游戏规则设计工具种类丰富,涵盖了从纸上原型工具到专业软件等多种类型,以下是一些常见的工具介绍:

  • 纸上原型工具:适合桌游、解谜游戏等规则设计初期快速验证想法,如白纸、彩纸、方格纸等可用来绘制游戏地图、卡片等;铅笔、彩色笔等绘图工具用于标注规则、绘制图案;还有剪刀、胶水等剪裁粘贴工具,以及骰子、棋子等用于模拟游戏中的随机元素和角色。
  • 流程图绘制工具:Lucidchart是一款常用的在线流程图绘制工具。它支持实时协作,团队成员可共同编辑,还能与Jira、Confluence等多种工具集成。可用于绘制游戏流程,清晰展示游戏规则中各个环节的逻辑关系,如任务流程、战斗流程等。Visio也是一款强大的流程图绘制软件,常用于绘制游戏世界地图和各个关卡的布局,能精确呈现游戏场景和规则区域划分。
  • 电子表格工具:Excel等电子表格工具可用于游戏数值规则设计。例如制作武器耐久度计算公式表、角色属性值表等,通过公式和函数来设定数值之间的关系,方便进行数值平衡调整。
  • 游戏原型设计工具:Tiny Game Design Tool主要用于游戏制作初期的原型设计。它能帮助开发者把创意快速记录下来,并按步骤分解和深化创意,形成可实行的游戏原型,涵盖情感、游戏机制、主题、角色、物体、障碍和关卡设计等方面。Figma是一款在线原型设计工具,具有强大的组件和样式功能,适合设计游戏界面原型,通过创建交互组件来模拟游戏中的操作流程,展示游戏规则在界面上的体现。
  • 游戏引擎插件:Puzzle Match Kit是Unity的插件,专为开发三消类游戏设计。它提供了完整的游戏框架,包括基本匹配规则、得分系统等,开发者可在此基础上定制游戏规则,还能通过其API自定义特殊规则,适合休闲类游戏规则设计。RPG角色扮演类卡牌生成器工具包是基于Unreal Engine的素材,包含创建roguelike卡牌生成器游戏所需的所有组件,通过数据表可轻松添加新卡牌等内容,还能通过组件定义行为,为卡牌游戏规则设计提供了灵活框架。
  • 开源游戏设计工具:Dota2RPGDesign是一个开源项目,利用Valve的Source 2引擎,通过可视化编辑器,用户可通过拖放方式创建地形、放置单位、设置事件和逻辑,还支持Lua脚本编写以满足复杂游戏逻辑需求,适合设计Dota 2风格的角色扮演游戏规则。除了传统的按空格分割和SQL关键字分段策略,以下是一些更智能的SQL分段方法,结合时序语义、语义解析、执行优化等场景需求,可显著提升复杂查询的处理效率和准确性:

⏱️ 1. 时序语义分段策略(适用于时序数据库场景)

• 按时间区间分段

 使用固定时间窗口或滑动窗口分组数据,例如 IoTDB 的语法:  GROUP BY ([startTime, endTime), interval, sliding_step)  参数 interval 定义窗口长度,sliding_step 定义滑动步长,适合周期性数据分析(如每5分钟统计一次温度平均值)。  

• 按数据变化值分段

 根据字段值的动态变化划分段,例如当数值变化超过阈值时开启新分段:  GROUP BY VARIATION(control_expression, delta)  若 delta=10,则相邻数据差值超过10时分段,适用于监控设备状态突变场景。  

• 按会话间隔分段

 以时间间隔作为分段依据,例如用户行为分析中的会话超时切割:  GROUP BY SESSION(time_interval)  连续数据的时间间隔超过 time_interval 则分段,适合用户活跃会话分析。

🔍 2. 语义解析分段策略(基于查询意图的分段)

• 模式链接(Schema Linking)

 将自然语言问题中的实体映射到数据库表结构:  ◦ 识别查询中的表名、字段名、条件值(如“销售额”映射到 sales.amount);  ◦ 结合TF-IDF或深度学习模型建立语义映射,减少歧义。  

• 分层解码生成

 分阶段生成SQL框架:  1. 首先生成SQL主干(如 SELECT...FROM...WHERE);  2. 填充具体字段、聚合函数、连接条件;  3. 适用于Text2SQL系统,提升嵌套查询的生成准确率。  

⚙️ 3. 执行优化导向的分段策略

• 分句解析与逻辑隔离

 将SQL按子句拆分为独立模块(SELECT、FROM、WHERE等),明确每部分作用:  -- 拆解示例SELECT e.name, d.dept_name        -- 目标字段FROM employee e                   -- 主表JOIN department d ON e.dept_id=d.id -- 连接逻辑WHERE e.salary > 5000            -- 过滤条件结合执行顺序(FROM → JOIN → WHERE → GROUP BY → SELECT)优化分段逻辑。  

• 子查询剥离与CTE重构

 将嵌套子查询转换为临时表或公共表达式(CTE):  WITH high_budget_dept AS (   -- 独立子查询段SELECT id FROM department WHERE budget > 100000)SELECT e.name FROM employee e WHERE e.dept_id IN (SELECT id FROM high_budget_dept)  降低复杂度,提升可读性和执行效率。  

📊 4. 结果集动态分段策略

• 上下文关键字片段提取

 在全文搜索结果中,按关键词位置提取上下文片段:  ◦ 定位关键词后,截取前后40个字符作为分段(如 [...部署目录中zyplayer-doc.jar...]);  ◦ 合并重叠分段,避免重复。  

• 基于打分的分段重排序

 根据关键词密度、位置相关性对分段打分,优先展示高权重段落:  SELECT (IF(text LIKE '%部署%', 2, 0) + IF(text LIKE '%系统%', 1, 0)) AS scoreFROM documentsORDER BY score DESC提升结果可读性。  

🔧 5. 动态参数化分段策略

• 外部参数驱动分段

 将分段规则参数化,支持动态传入分段点:  -- 传入分段点列表 [0, 100, 200]SELECT pseg(x, [0, 100, 200]) AS segment, AVG(x) FROM data_table GROUP BY segment结合二分法(pseg函数)提升大范围分段效率。  

• 条件表达式分段

 按布尔条件划分数据,例如:  GROUP BY CONDITION(expression)  将满足 expression 的连续数据归为一段,适用于异常检测场景。  

💎 总结:按场景选择分段策略

场景 推荐策略 优势

时序数据分析 时间区间/会话间隔分段 利用时序连续性,高效聚合

自然语言转SQL 模式链接+分层解码 精准映射语义,减少歧义

复杂查询优化 子查询剥离+分句解析 降低嵌套复杂度,提升可维护性

全文检索结果展示 关键字片段提取+动态打分 聚焦高相关内容,提升用户体验

动态参数聚合 二分法分段+外部参数驱动 支持实时更新分段规则,计算高效

通过以上策略,可显著提升复杂 SQL 的可读性、执行效率及结果精准度。实际应用中需结合数据库类型(如时序库IoTDB、OLAP引擎)和业务需求灵活组合。批处理训练和键值缓存(KV Cache)是提升大语言模型(LLM)推理性能的核心技术,其优化原理和性能对比如下:

一、批处理训练(Batching)的优化原理

  1. 计算资源利用率提升
    • 并行计算:GPU的矩阵运算单元(如Tensor Core)可同时处理多个输入样本。例如,单个矩阵乘法操作可覆盖整个批次的输入数据,显著提升计算密度(FLOP/s)。

    • 内存带宽瓶颈缓解:模型权重只需加载一次,即可服务整个批次的数据,减少重复的内存访问。当批量大小(Batch Size)满足 FLOPs / 内存带宽 的临界点时,计算从内存受限转为计算受限,实现更高吞吐量。

  2. 不同模型组件的优化效果
    • 密集层(Dense Layers):批处理对序列长度为1的自回归步骤(如生成下一个Token)优化显著,吞吐量提升可达10倍以上。

    • 自注意力层(Self-Attention):在短序列场景下(s≤100)有提升,但对长序列(s≥500)效果有限,因计算量随序列长度平方增长。

二、键值缓存(KV Cache)的优化原理

  1. 避免冗余计算
    • 在自回归解码中,每个新Token依赖于历史Token的键值对(K, V)。KV Cache缓存这些中间状态,避免每步重新计算,将复杂度从O(s²)降至O(s)。

    • 例如,Llama 2-7B模型的单序列KV缓存占用约2GB显存,但若不缓存,重新计算的开销远超内存访问成本。

  2. 内存管理优化
    • 分页缓存(PagedAttention):动态分配显存,避免为未使用的序列长度预留空间,减少内存碎片(如支持2048序列时实际利用率提高30%)。

    • 多查询注意力(MQA/GQA):通过共享键值头(如GQA将KV头减少至查询头的1/8),显存占用降低50%,支持更大批量。

三、性能对比数据

以下为实测性能提升数据(基于NVIDIA A100/A800 GPU):

优化技术 场景 吞吐量提升 时延降低 来源

动态批处理 LLaMA2-70B (8xA800) 4倍(vs VLLM) 4倍

KV缓存+GQA Mixtral 7Bx8 (MoE模型) 10倍 -

PagedAttention 长序列(s=4096) 2倍 30%

批处理+密集层 自回归生成(b=128) 8倍 几乎不变

关键结论:

• 批处理:在批量小于临界值(如128)时,吞吐量接近线性增长,时延几乎不变;超过临界值后受限于计算能力。

• KV缓存:显存占用减少50%~70%,长序列生成速度提升2-4倍。

• 联合优化:例如SiliconLLM引擎结合批处理、KV缓存和GQA,在70B模型上实现4-11倍吞吐量提升。

四、适用场景与限制

  1. 批处理的局限性
    • 长序列场景:序列长度超过1024时,批处理对注意力层优化有限(计算量仍为O(s²))。

    • 负载不均衡:MoE模型中专家分配不均可能降低吞吐量(需均匀路由策略)。

  2. KV缓存的权衡
    • 显存压力:缓存增长与批量大小和序列长度成正比,需配合量化(如FP16→INT8)进一步压缩。

    • 动态序列支持:PagedAttention可灵活处理变长序列,但需额外调度开销。

五、总结

批处理和KV缓存通过最大化硬件利用率与减少冗余计算,成为LLM推理优化的核心:
• 批处理:适用于计算密集型操作(如密集层),显著提升吞吐量,尤其在批量小于硬件临界值时。

• KV缓存:解决自回归生成中的重复计算问题,结合MQA/GQA和分页技术可进一步降低显存压力。

• 实际部署:结合两者(如动态批处理+分页KV缓存)可达成4-10倍端到端性能提升。

💡 建议:在实际部署中,需根据模型结构(如MoE)、序列长度分布调整批处理策略,并选用支持PagedAttention的推理引擎(如vLLM、SiliconLLM)以优化显存管理。以下是一些使用Python进行强化学习训练机器人的案例和资源,涵盖从基础教程到具体应用项目,帮助您快速了解和上手相关实践:

1. Python强化学习项目实战教程

  • 来源:Packt Publishing
  • 内容:该项目集合包含多个独立的强化学习任务,如Atari游戏代理构建、迷宫机器人等。每个项目从基础设置到高级策略都有详细说明,适合已经具备基础机器学习技能的用户。
  • 特点
    • 使用OpenAI Gym和PyTorch等库。
    • 提供代码示例,覆盖从环境设置到模型训练的全流程。
  • 链接:Python Reinforcement Learning Projects

2. 基于Q-learning的迷宫机器人

  • 来源:CSDN博客
  • 内容:通过Q-learning算法实现一个自动走迷宫的机器人。机器人需要在迷宫中避开陷阱,尽快到达终点。
  • 特点
    • 使用OpenAI Gym的FrozenLake环境。
    • 涉及状态-动作值函数(Q-table)的构建与更新。
    • 适合初学者理解强化学习中的“探索-利用”策略。
  • 链接:Python实现Q-learning迷宫机器人

3. 基于Gym Anytrading的强化学习机器人

  • 来源:SegmentFault
  • 内容:使用强化学习算法构建一个交易机器人,适用于金融交易场景。虽然不直接涉及机器人行走,但展示了强化学习在复杂环境中的泛化能力。
  • 特点
    • 使用Gym Anytrading环境和A2C(Advantage Actor-Critic)算法。
    • 提供数据加载、环境创建和策略优化的完整流程。
  • 链接:基于Gym Anytrading的强化学习机器人

4. 微型机器人在血管中的强化学习导航

  • 来源:Nature Machine Intelligence
  • 内容:苏黎世联邦理工学院(ETH Zurich)的研究团队使用基于模型的强化学习(MBRL)策略,训练微型机器人在血管环境中导航。
  • 特点
    • 使用Dreamer v.3算法和Pygame模拟器。
    • 在复杂环境中仅需较短时间即可达到90%的导航成功率。
    • 展示了强化学习在医疗机器人领域的潜力。
  • 链接:微型机器人强化学习导航

5. 华为Ark框架:让机器人编程更简单

  • 来源:腾讯新闻
  • 内容:华为诺亚实验室开发的Ark框架,是一个基于Python的机器人学习框架,旨在简化机器人编程。
  • 特点
    • 支持仿真与真实机器人之间的无缝切换。
    • 采用模块化设计,便于快速实现机器人控制算法。
    • 适合对机器人编程感兴趣但缺乏硬件经验的开发者。
  • 链接:Ark框架介绍

6. 人形机器人创新中心的仿真平台“格物”

  • 来源:第一财经
  • 内容:国家地方共建人形机器人创新中心发布的高性能仿真平台“格物”,集成了强化学习框架和多模态运动控制技术。
  • 特点
    • 一套代码覆盖多种机器人类型。
    • 支持复杂场景下的运动优化和策略训练。
  • 链接:格物平台介绍

如何选择适合的案例?

  • 初学者:建议从Q-learning迷宫机器人或Python强化学习项目实战教程入手,这些案例相对简单,适合熟悉强化学习基础。
  • 进阶用户:可以尝试微型机器人导航或基于Ark框架的项目,这些案例涉及更复杂的算法和实际应用场景。
  • 开发者:若希望快速搭建和部署机器人控制算法,推荐使用“格物”仿真平台或Ark框架。
    希望这些案例和资源能帮助您更好地理解Python在机器人强化学习中的应用!如果有更具体的需求,欢迎继续提问。

代码解读

这个代码是一个基于强化学习的条件文本生成模型的训练框架,主要包含以下几个部分:

1. 数据集加载与处理
  • PromptDataset :用于加载训练和验证的提示数据集(prompts)。在训练模式下,从指定的 JSON 文件中随机采样一定数量的 prompts;在非训练模式下,从对应的 JSONL 文件中加载不同情感倾向(如积极、消极、中性)的 prompts。
  • PromptCollator :对 prompts 进行批量编码,将其转换为模型可接受的输入格式,包括 input_ids 和 attention_mask,设定最大长度并进行截断或填充处理。
2. 模型与控制器
  • ConditionTrainer :核心训练类,负责整个训练流程的控制,包括模型的前向传播、损失计算、参数更新等。它包括以下几个关键组件:

    • policy 和 ref_policy :分别代表当前策略模型和参考策略模型(用于计算 KL 散度等)。
    • data_pool :用于存储和管理采样得到的数据。
    • 优化器和调度器 :对策略模型的参数进行优化,并根据训练进度调整学习率。
  • AdaptiveController 和 FixedController :用于动态或固定地调整 KL 散度的权重系数和熵的权重系数,以平衡生成文本的质量和多样性。

3. 采样与训练过程
  • sample 方法 :定期从当前策略模型中采样生成一批文本序列,并将其添加到数据池中。在训练初期(step 为 0 时),使用参考策略模型进行采样以初始化数据池。
  • step 方法 :执行一次完整的训练步骤,包括从数据池中获取批次数据、计算损失、反向传播更新模型参数等。其中,损失函数综合考虑了语言模型损失、KL 散度损失以及熵正则化项。
4. 评价与保存
  • eval 方法 :在验证集上对模型进行评价,计算生成文本的正确性(基于情感分类模型)、多样性(通过 distinctness 指标)等,并根据评价结果保存表现最好的模型。

换用 Qwen3-4B 模型可能遇到的问题及解决成功率

1. 模型架构差异问题
  • 问题 :Qwen3-4B 模型与原始代码中使用的模型(如基于 GPT2 的 Distill_Tuning)在架构上可能存在较大差异,例如参数数量、层结构、注意力机制等方面的不同,这可能导致代码中一些针对特定模型架构的实现无法直接适配,如模型的初始化、输入输出处理等。
  • 解决方法 :需要根据 Qwen3-4B 模型的官方文档和 API,对代码中的模型相关部分进行修改,以确保模型能够正确加载和运行。这可能涉及到调整模型的配置参数、修改前向传播过程中的张量操作等。解决的成功率取决于对 Qwen3-4B 模型架构的理解程度以及代码的灵活修改能力,若能准确把握其架构特点并进行相应调整,成功率较高,估计可达 80% - 90%。
2. 训练资源需求问题
  • 问题 :Qwen3-4B 模型具有较大的参数规模,训练时对硬件资源(如 GPU 内存、计算能力等)的要求会显著提高。如果训练设备的资源不足以支持 Qwen3-4B 模型的训练,可能会出现内存溢出、训练速度过慢等问题。
  • 解决方法 :可以尝试以下几种方式来缓解资源需求问题:
    • 使用混合精度训练(mixed - precision training),通过降低部分参数的精度来减少内存占用并加速计算,这在支持混合精度训练的深度学习框架(如 PyTorch 的 amp 模块)中相对容易实现,成功率较高,估计为 70% - 80%。
    • 对模型进行量化(quantization),将模型参数从浮点数表示转换为低位宽的整数表示,从而减少内存使用量,但可能会对模型的精度产生一定影响,需要在量化方法和精度损失之间进行权衡,成功率大约为 60% - 70%。
    • 增加训练设备的硬件资源,如使用更大的 GPU 或多 GPU 并行训练,这在资源允许的情况下是最直接有效的解决办法,但可能成本较高,其可行性取决于实际的硬件条件和预算,成功率在有足够资源支持的情况下可达到 100%。
3. 梯度计算与优化问题
  • 问题 :由于 Qwen3-4B 模型规模较大,在反向传播计算梯度时可能会出现梯度消失或梯度爆炸的问题,影响模型的训练稳定性和收敛速度。
  • 解决方法
    • 采用梯度裁剪(gradient clipping)技术,对梯度的范数进行限制,以防止梯度过大或过小对参数更新造成不良影响。在代码中已存在梯度裁剪的相关设置(self.params.clip_gradself.params.max_grad_norm),只需根据 Qwen3-4B 模型的实际情况调整合适的梯度裁剪阈值,成功率较高,估计为 80% - 90%。
    • 调整优化器的参数,如学习率、动量等,以更好地适应 Qwen3-4B 模型的训练需求。可能需要进行一系列的超参数搜索实验来确定最优的优化器参数组合,这需要一定的时间和计算资源,但成功率相对较高,可达 70% - 80%。
4. 生成文本质量与多样化问题
  • 问题 :替换为 Qwen3-4B 模型后,生成文本的质量和多样化程度可能会发生变化。由于模型规模和架构的不同,可能需要重新调整采样策略(如温度参数、top - p 等)以及训练过程中的正则化项(如 KL 散度系数、熵系数等),以获得满足要求的生成结果。
  • 解决方法
    • 对采样策略进行实验和调整,通过改变温度参数(temperature)来控制生成文本的随机性,较高的温度会产生更随机的文本,而较低的温度则会使生成结果更确定;同时,调整 top - p(核采样)的值来平衡生成文本的质量和多样性。这需要进行大量的实验来确定最佳的采样参数组合,解决的成功率受实验设计和试错过程的影响,估计为 60% - 70%。
    • 根据 Qwen3-4B 模型生成文本的特点,重新调整训练过程中的 KL 散度系数和熵系数等超参数,以引导模型在生成文本时更好地平衡与参考策略的相似性和自身的探索性。这可以通过观察训练过程中的损失变化和生成样本的质量来进行动态调整,成功率大约为 60% - 70%。

总体而言,在充分理解 Qwen3-4B 模型架构特点、合理配置训练资源以及对训练参数和采样策略进行精心调整的情况下,成功将其应用于该代码框架并获得良好训练效果的可能性较高,综合成功率估计可达 60% - 80%。但需要注意的是,这个过程可能需要投入较多的时间和计算资源来进行实验和优化。为了优化提高NL2SQL的准确率,我将对代码进行以下关键改进:

  1. 增强SQL验证器功能
  2. 改进奖励函数设计
  3. 优化批处理机制
  4. 增加模型容量
  5. 添加SQL执行验证
  6. 改进训练策略

下面是优化后的代码:

import torch
import os
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import numpy as np
from transformers import AutoModelForCausalLM, AutoTokenizer
from openai import OpenAI
import json
from collections import deque
import random
import matplotlib.pyplot as plt
import re
import time
import concurrent.futures
import threading
import sqlparse
from sqlparse.sql import IdentifierList, Identifier
from sqlparse.tokens import Keyword, DML
from functools import lru_cache
import pandas as pd
from sqlalchemy import create_engine, text
from tqdm import tqdm# 环境设置
os.environ["CUDA_VISIBLE_DEVICES"] = "6"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"sp = """ /no_think
你是擅长使用SQL帮助用户进行数据分析的智能助手,尽你所能的根据下面的用户问题生成准确的SQL语句。这里是一些参考的用户问题和对应的SQL语句:
在2023年7月,中国对哪些国家商品编码为340600的出口额排名在前五位? 
SELECT partner_country_en_name, SUM(trade_amt) AS total_trade_amt FROM nicbucdm.dwd_en_pub_customs_trade_hscode_df WHERE year = '2023' AND month = '07' AND direction = 'Export' AND reporter_country_en_name = 'China' AND hscode = '340600' GROUP BY partner_country_en_name ORDER BY SUM(trade_amt) DESC LIMIT 5;请使用下面的格式:
用户问题:用户输入的问题,你需要将其转换成SQL语句
SQL: 你在这里将用户问题转成SQL语句用户问题:"""# ==================== 配置类 ====================
class Config:def __init__(self):# PPO参数self.gamma = 0.99self.eps_clip = 0.2self.K_epochs = 3self.lr_actor = 1e-5self.lr_critic = 1e-4self.batch_size = 32self.buffer_size = 1000self.entropy_coef = 0.01self.max_grad_norm = 0.5# 评估参数self.eval_samples = 200  # 增加评估样本数self.max_retry = 5       # 增加最大重试次数self.batch_generation = Trueself.parallel_evaluation = Trueself.max_workers = 8     # 增加并行工作线程数self.cache_size = 2000    # 增加缓存大小self.local_validation = Trueself.enable_execution = True  # 启用SQL执行验证# 数据库连接self.db_connection = "postgresql://user:password@localhost:5432/mydb"# 模型参数self.model_size = "7B"   # 使用更大的模型# ==================== SQL 验证器 ====================
class SQLValidator:@staticmethoddef is_valid_sql(sql):"""更严格的SQL验证"""if not sql or not sql.strip():return False# 检查基本SQL结构sql = sql.strip().upper()if not sql.startswith("SELECT"):return False# 检查是否包含必需的关键字required_keywords = ["SELECT", "FROM"]if not all(kw in sql for kw in required_keywords):return False# 尝试解析SQLtry:parsed = sqlparse.parse(sql)if not parsed:return False# 检查是否有语法错误for stmt in parsed:if stmt.get_type() != "SELECT":return Falsereturn Trueexcept:return False@staticmethoddef extract_keywords(sql):"""更全面的关键字提取"""parsed = sqlparse.parse(sql)[0]tokens = set()for token in parsed.tokens:if token.ttype in (Keyword, DML):tokens.add(token.value.upper())elif isinstance(token, Identifier):tokens.add(token.get_real_name().upper())elif isinstance(token, IdentifierList):for ident in token.get_identifiers():tokens.add(ident.get_real_name().upper())return tokens@staticmethoddef extract_tables_and_columns(sql):"""提取SQL中使用的表和列"""parsed = sqlparse.parse(sql)[0]tables = set()columns = set()for item in parsed.tokens:if isinstance(item, Identifier):name = item.get_real_name().upper()if '.' in name:table, col = name.split('.', 1)tables.add(table)columns.add(col)else:columns.add(name)elif isinstance(item, IdentifierList):for ident in item.get_identifiers():name = ident.get_real_name().upper()if '.' in name:table, col = name.split('.', 1)tables.add(table)columns.add(col)else:columns.add(name)return tables, columns# ==================== 数据库执行器 ====================
class SQLExecutor:def __init__(self, connection_string):self.engine = create_engine(connection_string)self.cache = {}self.cache_size = 1000self.lock = threading.Lock()@lru_cache(maxsize=1000)def execute_sql(self, sql):"""执行SQL并返回结果(带缓存)"""cache_key = hash(sql)if cache_key in self.cache:return self.cache[cache_key]try:with self.engine.connect() as conn:result = conn.execute(text(sql))rows = result.fetchall()columns = result.keys()df = pd.DataFrame(rows, columns=columns)# 缓存结果with self.lock:if len(self.cache) >= self.cache_size:self.cache.popitem(last=False)self.cache[cache_key] = dfreturn dfexcept Exception as e:print(f"SQL execution error: {e}")return Nonedef compare_results(self, sql1, sql2):"""比较两个SQL的执行结果"""df1 = self.execute_sql(sql1)df2 = self.execute_sql(sql2)if df1 is None or df2 is None:return 0.0try:# 比较数据框的相似性if df1.shape != df2.shape:return 0.2# 比较列名col_match = len(set(df1.columns) & set(df2.columns)) / max(len(df1.columns), 1)# 比较行内容(抽样比较)sample_size = min(10, len(df1), len(df2))if sample_size == 0:return col_matchdf1_sample = df1.sample(sample_size).sort_index()df2_sample = df2.sample(sample_size).sort_index()row_match = (df1_sample.values == df2_sample.values).mean()return (col_match + row_match) / 2except:return 0.0# ==================== 记忆回放缓冲区 ====================
class ReplayBuffer:def __init__(self, buffer_size):self.buffer = deque(maxlen=buffer_size)self.priority_buffer = deque(maxlen=buffer_size//2)  # 高优先级缓冲区def add(self, state, action, action_logprob, reward, done, priority=False):if priority:self.priority_buffer.append((state, action, action_logprob, reward, done))else:self.buffer.append((state, action, action_logprob, reward, done))def sample(self, batch_size):# 优先从优先级缓冲区采样priority_samples = min(len(self.priority_buffer), batch_size//2)normal_samples = batch_size - priority_samplessamples = []if priority_samples > 0:samples += random.sample(self.priority_buffer, priority_samples)if normal_samples > 0:samples += random.sample(self.buffer, min(len(self.buffer), normal_samples))return samplesdef clear(self):self.buffer.clear()self.priority_buffer.clear()def __len__(self):return len(self.buffer) + len(self.priority_buffer)# ==================== 评论家模型(Zhipu API) ====================
class CriticModel:def __init__(self, cache_size=1000):self.client = OpenAI(api_key="a45e2361a7c5ea66a45d93de02e3137c.5XxQIeXUVlOuLUZS",base_url="https://open.bigmodel.cn/api/paas/v4/")self.cache = {}self.cache_size = cache_sizeself.lock = threading.Lock()self.api_call_count = 0self.total_api_time = 0@lru_cache(maxsize=1000)def evaluate_sql(self, correct_sql, generated_sql):cache_key = f"{hash(correct_sql)}_{hash(generated_sql)}"if cache_key in self.cache:return self.cache[cache_key]# 先进行本地验证if not SQLValidator.is_valid_sql(generated_sql):return 0.0, "生成的SQL无效(空或不包含基本结构)"prompt = f"""请严格按以下规则评分:1. 对比正确SQL和生成SQL,找出所有差异点2. 每个语法错误扣1分,每个语义错误扣2分3. 基础分10分,扣除错误分后得到最终分4. 对于关键错误(如缺少WHERE条件、错误分组)扣3分5. 返回格式:错误分析+得分(0-10)正确SQL: {correct_sql}生成SQL: {generated_sql}"""try:start_time = time.time()completion = self.client.chat.completions.create(model="glm-4",  # 使用更强大的模型messages=[{"role": "user", "content": prompt}],temperature=0.1)response_time = time.time() - start_timewith self.lock:self.api_call_count += 1self.total_api_time += response_timeresult = completion.choices[0].message.content# 从结果中提取分数score = 0error_analysis = ""if "得分" in result:score_match = re.search(r'得分[::]\s*(\d+(?:\.\d+)?)', result)if score_match:score = float(score_match.group(1))else:numbers = re.findall(r'\d+(?:\.\d+)?', result)if numbers:score = float(numbers[-1])else:# 如果没有明确得分,尝试根据错误分析估计if "错误" in result or "问题" in result:score = max(0, 5 - result.count("错误") - result.count("问题"))else:score = 8.0  # 默认较高分# 错误分析是除了分数之外的内容error_analysis = result.replace(f"得分: {score}", "").strip()result = (score / 10.0, error_analysis)  # 归一化到0-1# 缓存结果with self.lock:if len(self.cache) >= self.cache_size:self.cache.popitem(last=False)self.cache[cache_key] = resultreturn resultexcept Exception as e:print(f"Critic error: {e}")return 0.5, "Critic evaluation failed."# ==================== 评估器 ====================
class Evaluator:def __init__(self, critic_model, sql_executor=None):self.critic = critic_modelself.sql_executor = sql_executorself.metrics = {'exact_match': [],'partial_match': [],'syntax_score': [],'execution_match': []}def evaluate(self, generated_sql, correct_sql):# 1. 精确匹配评估exact_match = int(generated_sql.strip() == correct_sql.strip())# 2. 部分匹配评估gen_keywords = SQLValidator.extract_keywords(generated_sql) if generated_sql else set()cor_keywords = SQLValidator.extract_keywords(correct_sql)partial_match = len(gen_keywords & cor_keywords) / len(cor_keywords) if cor_keywords else 0# 3. 语法评分if SQLValidator.is_valid_sql(generated_sql):syntax_score, _ = self.critic.evaluate_sql(correct_sql, generated_sql)else:syntax_score = 0.0# 4. 执行结果匹配(如果启用)execution_match = 0.0if self.sql_executor and syntax_score > 0.3:  # 只有语法基本正确时才执行execution_match = self.sql_executor.compare_results(correct_sql, generated_sql)# 记录指标self.metrics['exact_match'].append(exact_match)self.metrics['partial_match'].append(partial_match)self.metrics['syntax_score'].append(syntax_score)self.metrics['execution_match'].append(execution_match)return {'exact_match': exact_match,'partial_match': partial_match,'syntax_score': syntax_score,'execution_match': execution_match,'generated_sql': generated_sql,'correct_sql': correct_sql}def get_metrics(self):return {'exact_match': np.mean(self.metrics['exact_match']),'partial_match': np.mean(self.metrics['partial_match']),'syntax_score': np.mean(self.metrics['syntax_score']),'execution_match': np.mean(self.metrics['execution_match']),'sample_size': len(self.metrics['exact_match'])}def reset(self):for k in self.metrics:self.metrics[k] = []# ==================== PPO模型(基于Qwen) ====================
class PPOModel(nn.Module):def __init__(self, model_path, critic_model=None):super(PPOModel, self).__init__()# 使用更大的模型model_path = model_path.replace("4B", "7B") if "4B" in model_path else model_pathself.base_model = AutoModelForCausalLM.from_pretrained(model_path,device_map="auto",trust_remote_code=True,torch_dtype=torch.bfloat16,load_in_4bit=True  # 使用4位量化减少内存占用)self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)self.value_head = nn.Linear(self.base_model.config.hidden_size, 1)self.critic_model = critic_modelself.generation_cache = {}self.retry_stats = {"total": 0, "invalid": 0}self.error_analysis_cache = {}def forward(self, input_ids, attention_mask):outputs = self.base_model(input_ids=input_ids,attention_mask=attention_mask,output_hidden_states=True)last_hidden_state = outputs.hidden_states[-1]values = self.value_head(last_hidden_state.mean(dim=1))return outputs.logits, valuesdef generate_batch(self, input_texts, max_length=4096):"""批量生成SQL,提高效率"""inputs = self.tokenizer(input_texts, return_tensors="pt", padding=True, truncation=True,max_length=1024,return_token_type_ids=False).to(self.base_model.device)outputs = self.base_model.generate(**inputs,max_length=max_length,pad_token_id=self.tokenizer.eos_token_id,num_return_sequences=1,temperature=0.7,do_sample=True,top_p=0.9,top_k=50  # 增加top_k以获得更多样化的结果)generated_texts = []for i in range(len(input_texts)):output_seq = outputs[i]generated_text = self.tokenizer.decode(output_seq, skip_special_tokens=True)sql_part = str(generated_text).split("SQL:")[-1].strip()generated_texts.append(sql_part)return generated_textsdef generate(self, input_text, correct_sql=None, max_retry=5, max_length=4096):"""生成SQL,支持错误重试机制"""cache_key = hash(input_text)if cache_key in self.generation_cache:return self.generation_cache[cache_key]if correct_sql is None:return self._single_generate(input_text, max_length)current_prompt = input_textbest_sql = ""best_score = 0attempts = []for attempt in range(max_retry):generated_sql = self._single_generate(current_prompt, max_length)if not SQLValidator.is_valid_sql(generated_sql):error_analysis = "生成的SQL无效(空或不包含基本结构)"score = 0.0with threading.Lock():self.retry_stats["total"] += 1self.retry_stats["invalid"] += 1# 添加到提示词中再次生成current_prompt = input_text + f"\n\n--- 错误分析(尝试 {attempt+1})---\n{error_analysis}\n\n请根据以上分析修正SQL:"continueif self.critic_model:score, error_analysis = self.critic_model.evaluate_sql(correct_sql, generated_sql)else:score = 1.0 if generated_sql.strip() == correct_sql.strip() else 0.5error_analysis = ""with threading.Lock():self.retry_stats["total"] += 1attempts.append((generated_sql, score, error_analysis))if score > best_score:best_sql = generated_sqlbest_score = scoreif score >= 0.95 or attempt == max_retry - 1:self.generation_cache[cache_key] = best_sqlreturn best_sql# 缓存错误分析以重用error_cache_key = hash((input_text, generated_sql))if error_cache_key not in self.error_analysis_cache:self.error_analysis_cache[error_cache_key] = error_analysiselse:error_analysis = self.error_analysis_cache[error_cache_key]current_prompt = input_text + f"\n\n--- 错误分析(尝试 {attempt+1})---\n{error_analysis}\n\n请根据以上分析修正SQL:"self.generation_cache[cache_key] = best_sqlreturn best_sqldef _single_generate(self, input_text, max_length):inputs = self.tokenizer(input_text, return_tensors="pt").to(self.base_model.device)outputs = self.base_model.generate(**inputs,max_length=max_length,pad_token_id=self.tokenizer.eos_token_id,num_beams=3,  # 使用beam search提高质量early_stopping=True)generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)return str(generated_text).split("SQL:")[-1].strip()# ==================== PPO训练器 ====================
class PPOTrainer:def __init__(self, config, model_path, dataset_path):self.config = configself.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")self.critic_model = CriticModel(config.cache_size)self.sql_executor = SQLExecutor(config.db_connection) if config.enable_execution else Noneself.policy = PPOModel(model_path, self.critic_model).to(self.device)self.scaler = torch.cuda.amp.GradScaler()self.optimizer = optim.Adam([{'params': self.policy.base_model.parameters(), 'lr': config.lr_actor},{'params': self.policy.value_head.parameters(), 'lr': config.lr_critic}])# 学习率调度器self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(self.optimizer, mode='max',factor=0.5,patience=2,verbose=True)self.buffer = ReplayBuffer(config.buffer_size)self.dataset = self.load_dataset(dataset_path)self.mse_loss = nn.MSELoss()self.evaluator = Evaluator(self.critic_model, self.sql_executor)self.results = {'pre_train': None, 'post_train': None}self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=config.max_workers)self.training_stats = {"total_samples": 0,"valid_sql": 0,"api_calls_saved": 0}self.best_score = 0.0def load_dataset(self, path):with open(path, 'r', encoding='utf-8') as f:data = json.load(f)# 按问题复杂度排序(简单问题优先)return sorted(data, key=lambda x: len(x['output']))def compute_rewards(self, generated_sql, correct_sql):# 1. 本地验证SQLis_valid = SQLValidator.is_valid_sql(generated_sql)if not is_valid:return 0.0, "Invalid SQL"# 2. 部分匹配奖励gen_keywords = SQLValidator.extract_keywords(generated_sql)cor_keywords = SQLValidator.extract_keywords(correct_sql)coverage = len(gen_keywords & cor_keywords) / len(cor_keywords) if cor_keywords else 0# 3. 结构相似度gen_tables, gen_columns = SQLValidator.extract_tables_and_columns(generated_sql)cor_tables, cor_columns = SQLValidator.extract_tables_and_columns(correct_sql)table_match = len(gen_tables & cor_tables) / len(cor_tables) if cor_tables else 1.0column_match = len(gen_columns & cor_columns) / len(cor_columns) if cor_columns else 1.0structure_sim = (table_match + column_match) / 2# 4. 执行结果匹配execution_match = 0.0if self.sql_executor and structure_sim > 0.5:execution_match = self.sql_executor.compare_results(correct_sql, generated_sql)# 5. 综合奖励if coverage > 0.8 and structure_sim > 0.8:reward = 0.8 + 0.2 * execution_matchreturn reward, "High coverage and structure similarity"else:syntax_reward, _ = self.critic_model.evaluate_sql(correct_sql, generated_sql)reward = (syntax_reward * 0.7) + (execution_match * 0.3)return reward, "API evaluation"def evaluate_model(self, phase='pre_train'):self.evaluator.reset()eval_results = []eval_items = self.dataset[:self.config.eval_samples]# 使用tqdm显示进度条with tqdm(total=len(eval_items), desc=f"{phase} Evaluation") as pbar:if self.config.parallel_evaluation:futures = []for item in eval_items:state = sp + item['input']future = self.executor.submit(self.policy.generate,state,item['output'],self.config.max_retry)futures.append((item, future))for item, future in futures:generated_sql = future.result()eval_result = self.evaluator.evaluate(generated_sql, item['output'])eval_results.append(eval_result)pbar.update(1)else:for item in eval_items:state = sp + item['input']generated_sql = self.policy.generate(state, correct_sql=item['output'],max_retry=self.config.max_retry)eval_result = self.evaluator.evaluate(generated_sql, item['output'])eval_results.append(eval_result)pbar.update(1)metrics = self.evaluator.get_metrics()self.results[phase] = {'metrics': metrics,'samples': eval_results[:10]  # 保存更多样本用于分析}print(f"\n===== {phase.upper()} EVALUATION =====")print(f"Exact Match: {metrics['exact_match']*100:.2f}%")print(f"Partial Match: {metrics['partial_match']*100:.2f}%")print(f"Syntax Score: {metrics['syntax_score']*100:.2f}%")if self.config.enable_execution:print(f"Execution Match: {metrics['execution_match']*100:.2f}%")# 打印重试统计retry_stats = self.policy.get_retry_stats()api_stats = self.critic_model.get_api_stats()print(f"Retry Stats: Total={retry_stats['total']}, Invalid={retry_stats['invalid']}")print(f"API Stats: Calls={api_stats['api_calls']}, AvgTime={api_stats['avg_time']:.2f}s")return metricsdef update(self):if len(self.buffer) < self.config.batch_size:returnbatch = self.buffer.sample(self.config.batch_size)states, actions, old_logprobs, rewards, dones = zip(*batch)states = list(states)old_logprobs = torch.stack(old_logprobs).to(self.device).detach()rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)# 计算折扣奖励discounted_rewards = []running_reward = 0for reward, done in zip(reversed(rewards), reversed(dones)):if done:running_reward = 0running_reward = reward + self.config.gamma * running_rewarddiscounted_rewards.insert(0, running_reward)discounted_rewards = torch.tensor(discounted_rewards).to(self.device)discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-7)# PPO更新total_loss = 0for _ in range(self.config.K_epochs):logprobs, values, entropies = [], [], []with torch.cuda.amp.autocast():for state in states:inputs = self.policy.tokenizer(state, return_tensors="pt").to(self.device)logits, value = self.policy(**inputs)probs = torch.softmax(logits[:, -1, :], dim=-1)dist = Categorical(probs)action = torch.tensor([actions[states.index(state)]]).to(self.device)logprob = dist.log_prob(action)entropy = dist.entropy()logprobs.append(logprob)values.append(value.squeeze())entropies.append(entropy)logprobs = torch.stack(logprobs).squeeze()values = torch.stack(values).squeeze()entropies = torch.stack(entropies).squeeze()ratios = torch.exp(logprobs - old_logprobs.detach())advantages = discounted_rewards - values.detach()surr1 = ratios * advantagessurr2 = torch.clamp(ratios, 1-self.config.eps_clip, 1+self.config.eps_clip) * advantagespolicy_loss = -torch.min(surr1, surr2).mean()value_loss = self.mse_loss(values, discounted_rewards)entropy_loss = -entropies.mean()loss = policy_loss + 0.5*value_loss + self.config.entropy_coef*entropy_losstotal_loss += loss.item()self.optimizer.zero_grad()self.scaler.scale(loss).backward()self.scaler.unscale_(self.optimizer)torch.nn.utils.clip_grad_norm_(self.policy.parameters(), self.config.max_grad_norm)self.scaler.step(self.optimizer)self.scaler.update()avg_loss = total_loss / self.config.K_epochsreturn avg_lossdef save_model(self, epoch, score):# 仅当模型改进时保存if score > self.best_score:self.best_score = scoretorch.save({'model_state_dict': self.policy.state_dict(),'optimizer_state_dict': self.optimizer.state_dict(),'epoch': epoch,'best_score': score}, f"best_ppo_sql_generator.pt")print(f"Saved best model with score: {score:.4f}")# 定期保存检查点if epoch % 2 == 0:torch.save({'model_state_dict': self.policy.state_dict(),'optimizer_state_dict': self.optimizer.state_dict(),'epoch': epoch,'score': score}, f"ppo_sql_generator_epoch{epoch}.pt")def train(self, epochs=5):self.evaluate_model('pre_train')for epoch in range(1, epochs+1):total_reward = 0batch_states = []batch_items = []# 使用tqdm显示训练进度with tqdm(total=len(self.dataset), desc=f"Epoch {epoch}/{epochs}") as pbar:for item in self.dataset:state = sp + item['input']if self.config.batch_generation:batch_states.append(state)batch_items.append(item)if len(batch_states) >= 8:  # 更大的批处理generated_sqls = self.policy.generate_batch(batch_states)for i, sql in enumerate(generated_sqls):item = batch_items[i]with threading.Lock():self.training_stats["total_samples"] += 1if SQLValidator.is_valid_sql(sql):self.training_stats["valid_sql"] += 1reward, reward_reason = self.compute_rewards(sql, item['output'])total_reward += reward# 根据奖励值设置优先级priority = reward < 0.5  # 低奖励样本优先action = random.randint(0, 100)action_logprob = torch.tensor([0.0])self.buffer.add(batch_states[i], action, action_logprob, reward, False, priority)pbar.set_postfix({"reward": f"{reward:.2f}","reason": reward_reason[:10]})batch_states = []batch_items = []pbar.update(8)else:action, action_logprob, _ = self.policy.act(state)generated_sql = self.policy.generate(state)with threading.Lock():self.training_stats["total_samples"] += 1if SQLValidator.is_valid_sql(generated_sql):self.training_stats["valid_sql"] += 1reward, reward_reason = self.compute_rewards(generated_sql, item['output'])total_reward += rewardpriority = reward < 0.5self.buffer.add(state, action, action_logprob, reward, False, priority)pbar.set_postfix({"reward": f"{reward:.2f}","reason": reward_reason[:10]})pbar.update(1)if len(self.buffer) >= self.config.batch_size:loss = self.update()pbar.set_postfix({"loss": f"{loss:.4f}"})# 处理剩余的批次数据if batch_states:generated_sqls = self.policy.generate_batch(batch_states)for i, sql in enumerate(generated_sqls):item = batch_items[i]with threading.Lock():self.training_stats["total_samples"] += 1if SQLValidator.is_valid_sql(sql):self.training_stats["valid_sql"] += 1reward, _ = self.compute_rewards(sql, item['output'])total_reward += rewardaction = random.randint(0, 100)action_logprob = torch.tensor([0.0])self.buffer.add(batch_states[i], action, action_logprob, reward, False)avg_reward = total_reward / len(self.dataset)print(f"Epoch {epoch} Avg Reward: {avg_reward:.4f}")# 打印训练统计valid_percent = self.training_stats["valid_sql"] / max(self.training_stats["total_samples"], 1) * 100print(f"Training Stats: Valid SQL: {valid_percent:.1f}%")# 定期评估if epoch % 1 == 0:  # 每个epoch都评估eval_start = time.time()metrics = self.evaluate_model(f'epoch_{epoch}')print(f"Evaluation time: {time.time() - eval_start:.2f}s")# 根据评估分数调整学习率self.scheduler.step(metrics['syntax_score'])# 保存模型self.save_model(epoch, metrics['syntax_score'])# 每2个epoch清除缓存以释放内存if epoch % 2 == 0:self.policy.generation_cache.clear()self.critic_model.cache.clear()# 最终评估self.evaluate_model('post_train')self.executor.shutdown()# ==================== 结果分析器 ====================
class ResultAnalyzer:def __init__(self, results):self.results = resultsdef compare_metrics(self):pre_metrics = self.results['pre_train']['metrics']post_metrics = self.results['post_train']['metrics']comparison = {'exact_match': post_metrics['exact_match'] - pre_metrics['exact_match'],'partial_match': post_metrics['partial_match'] - pre_metrics['partial_match'],'syntax_score': post_metrics['syntax_score'] - pre_metrics['syntax_score'],'execution_match': post_metrics.get('execution_match', 0) - pre_metrics.get('execution_match', 0)}return comparisondef analyze_error_patterns(self):pre_samples = self.results['pre_train']['samples']post_samples = self.results['post_train']['samples']error_categories = {'missing_condition': 0,'wrong_table': 0,'wrong_column': 0,'aggregation_error': 0,'syntax_error': 0,'other': 0}# 分析训练前错误for sample in pre_samples:if sample['syntax_score'] < 0.7:error_type = self.classify_error(sample)error_categories[error_type] += 1pre_errors = error_categories.copy()error_categories = {k: 0 for k in error_categories}# 分析训练后错误for sample in post_samples:if sample['syntax_score'] < 0.7:error_type = self.classify_error(sample)error_categories[error_type] += 1return {'pre_train': pre_errors,'post_train': error_categories}def classify_error(self, sample):gen_sql = sample['generated_sql']cor_sql = sample['correct_sql']# 检查缺失条件if 'WHERE' in cor_sql and 'WHERE' not in gen_sql:return 'missing_condition'# 检查错误的表gen_tables, _ = SQLValidator.extract_tables_and_columns(gen_sql)cor_tables, _ = SQLValidator.extract_tables_and_columns(cor_sql)if gen_tables != cor_tables:return 'wrong_table'# 检查错误的列_, gen_columns = SQLValidator.extract_tables_and_columns(gen_sql)_, cor_columns = SQLValidator.extract_tables_and_columns(cor_sql)if len(gen_columns - cor_columns) > 0 or len(cor_columns - gen_columns) > 0:return 'wrong_column'# 检查聚合错误if ('GROUP BY' in cor_sql or 'SUM(' in cor_sql or 'COUNT(' in cor_sql) and \('GROUP BY' not in gen_sql or 'SUM(' not in gen_sql or 'COUNT(' not in gen_sql):return 'aggregation_error'# 检查语法错误if not SQLValidator.is_valid_sql(gen_sql):return 'syntax_error'return 'other'def generate_report(self):comparison = self.compare_metrics()error_analysis = self.analyze_error_patterns()report = {'improvement': {'exact_match': f"{comparison['exact_match']*100:.2f}%",'partial_match': f"{comparison['partial_match']*100:.2f}%",'syntax_score': f"{comparison['syntax_score']*100:.2f}%",'execution_match': f"{comparison.get('execution_match', 0)*100:.2f}%"},'error_reduction': {}}for error_type in error_analysis['pre_train']:pre_count = error_analysis['pre_train'][error_type]post_count = error_analysis['post_train'][error_type]reduction = pre_count - post_countreduction_percent = (reduction / pre_count * 100) if pre_count > 0 else 0report['error_reduction'][error_type] = {'pre_count': pre_count,'post_count': post_count,'reduction': reduction,'reduction_percent': f"{reduction_percent:.1f}%"}return report# ==================== 主程序 ====================
if __name__ == '__main__':config = Config()model_path = "/data/models/hub/Qwen/Qwen3-7B"  # 使用7B模型dataset_path = "/data/wwwroot/roll_rl_grpo/mydata/runtomysql/33.json"trainer = PPOTrainer(config, model_path, dataset_path)start_time = time.time()trainer.train(epochs=5)  # 增加训练轮数total_time = time.time() - start_time# 保存最终结果with open("training_results.json", "w") as f:json.dump(trainer.results, f, indent=2)# 分析结果analyzer = ResultAnalyzer(trainer.results)report = analyzer.generate_report()print("\n=== FINAL TRAINING REPORT ===")print("Improvement:")for metric, value in report['improvement'].items():print(f"{metric.replace('_', ' ').title()}: {value}")print("\nError Reduction:")for error_type, stats in report['error_reduction'].items():print(f"{error_type.replace('_', ' ').title()}:")print(f"  Pre-train: {stats['pre_count']}, Post-train: {stats['post_count']}")print(f"  Reduction: {stats['reduction']} ({stats['reduction_percent']})")# 可视化结果labels = ['Exact Match', 'Partial Match', 'Syntax Score']if config.enable_execution:labels.append('Execution Match')pre_train = [trainer.results['pre_train']['metrics']['exact_match']*100,trainer.results['pre_train']['metrics']['partial_match']*100,trainer.results['pre_train']['metrics']['syntax_score']*100]post_train = [trainer.results['post_train']['metrics']['exact_match']*100,trainer.results['post_train']['metrics']['partial_match']*100,trainer.results['post_train']['metrics']['syntax_score']*100]if config.enable_execution:pre_train.append(trainer.results['pre_train']['metrics']['execution_match']*100)post_train.append(trainer.results['post_train']['metrics']['execution_match']*100)x = np.arange(len(labels))width = 0.35fig, ax = plt.subplots(figsize=(12, 7))rects1 = ax.bar(x - width/2, pre_train, width, label='Pre-Train')rects2 = ax.bar(x + width/2, post_train, width, label='Post-Train')ax.set_ylabel('Score (%)')ax.set_title('Training Results Comparison')ax.set_xticks(x)ax.set_xticklabels(labels)ax.legend()fig.tight_layout()plt.savefig('training_comparison.png')print("Saved visualization to training_comparison.png")# 打印最终统计api_stats = trainer.critic_model.get_api_stats()retry_stats = trainer.policy.get_retry_stats()training_stats = trainer.training_statsprint("\n=== FINAL STATISTICS ===")print(f"Total training time: {total_time/3600:.2f} hours")print(f"API calls: {api_stats['api_calls']}, Total API time: {api_stats['total_time']/3600:.2f} hours")print(f"Retry stats: Total={retry_stats['total']}, Invalid={retry_stats['invalid']}")print(f"Training stats: Total samples={training_stats['total_samples']}, Valid SQL={training_stats['valid_sql']}")# 示例预测sample_data = trainer.dataset[0]sample_prompt = sp + sample_data['input']generated_sql = trainer.policy.generate(sample_prompt, correct_sql=sample_data['output'],max_retry=config.max_retry)print("\n=== FINAL SAMPLE GENERATION ===")print(f"Prompt: {sample_data['input']}")print(f"Generated SQL: {generated_sql}")print(f"Correct SQL: {sample_data['output']}")

关键优化点:

  1. 增强SQL验证器

    • 添加更严格的SQL结构检查
    • 实现表和列提取功能
    • 改进关键字提取逻辑
  2. 改进奖励函数

    • 添加结构相似度计算(表和列匹配)
    • 整合SQL执行结果比较
    • 综合多种奖励因素(覆盖率、结构相似度、执行结果)
  3. 优化批处理机制

    • 增加批处理大小(从4到8)
    • 添加优先级缓冲区
    • 使用tqdm显示进度条
  4. 增加模型容量

    • 使用Qwen3-7B模型替代4B版本
    • 启用4位量化减少显存占用
    • 改进生成参数(使用beam search)
  5. 添加SQL执行验证

    • 实现SQL执行器类
    • 比较执行结果相似度
    • 将执行匹配度加入评估指标
  6. 改进训练策略

    • 增加训练轮数(从1到5)
    • 添加学习率调度器
    • 实现模型保存策略(仅保存最佳模型)
    • 按问题复杂度排序数据集
  7. 增强错误处理

    • 增加最大重试次数(从3到5)
    • 缓存错误分析结果
    • 添加错误分类系统
  8. 结果分析系统

    • 实现结果分析器类
    • 生成详细训练报告
    • 可视化错误减少情况

这些优化措施将显著提高NL2SQL的准确率,特别是通过SQL执行验证和增强的错误处理机制,可以确保生成的SQL不仅在语法上正确,在实际执行中也能产生预期结果。

http://www.lryc.cn/news/582479.html

相关文章:

  • TVS管工作原理是什么?主要的应用场景都有哪些?
  • MySQL数据库访问(C/C++)
  • 赛博威破解快消品渠道营销三重困局,助力企业实现“活动即战力”
  • 小米YU7预售现象深度解析:智能电动汽车的下一个范式革命
  • 内容页模板表格显示不全的问题处理
  • IP 能ping通,服务器是否开机?
  • 第8章:应用层协议HTTP、SDN软件定义网络、组播技术、QoS
  • 【快手】数据挖掘面试题0002:求某地铁站每日客流量,乘地铁经过、进出站人都包括在内
  • Tourism Management and Technology Economy,旅游管理与技术经济知网期刊
  • Oracle 存储过程、函数与触发器
  • 【OceanBase诊断调优】—— 执行计划显示分区 PARTITIONS[P0SP9] 如何查询是哪个分区?
  • 数据结构与算法:博弈类问题
  • 服务器经常出现蓝屏是什么原因导致的?如何排查和修复?
  • node.js中yarn、npm、cnpm详解
  • npm : 无法加载文件 D:\Node\npm.ps1,因为在此系统上禁止运行脚本。
  • 【QT】-隐式转换 explicit用法
  • React18+TypeScript状态管理最佳实践
  • 说说SpringBoot常用的注解?
  • 【Nginx】Nginx代理WebSocket
  • Ollama+OpenWebUI 0.42+0.3.35 最新版一键安装教程,解决手动更新失败问题
  • kafka如何让消息均匀的写入到每个partition
  • OpenWebUI(5)源码学习-后端socket通信模块
  • App Trace功能实战:一键拉起应用实践
  • 【保姆级图文详解】RAG 实战(Spring AI + 本地知识库)旅游知识库问答
  • 微软上线 Deep Research 预览版:o3+必应赋能研究自动化
  • OGRE 3D----6. 背景图片渲染实现详解
  • 【Unity3D】微信小游戏适配安全区域或胶囊控件(圆圈按钮)水平高度一致方案
  • element el-table渲染二维对象数组
  • 什么是 3D 文件?
  • 开源 python 应用 开发(三)python语法介绍