决策规划内容整理
整理主要是对常见的决策规划算法进行一个整理,基本的地图环境使用了高精地图,grid map和ros的栅格地图三种形式。
对应的视频如下
2025研究生入学:无人驾驶车辆的决策规划算法
如有源码和文档的需求可以视频评论区留言
目录
- 高精地图-dijkstra
- grid map路径搜索
- 优化方法求解
- Frenet坐标系的分解思想.
- smach状态机
- 栅格地图规划方法
目前是一些最简单的算法的搭建,适合刚入门决策规划的新人。
高精地图-dijkstra
此内容截取了矿卡项目中的高精地图规划的内容出来,没有封装成功能包,直接通过python启动即可。
启动一个roscore
roscore
python map_plot.py # 解析高精地图并可视化
rviz # 并添加markerarray
使用dijkstra进行规划
python path_plan.py # 然后在rviz中添加对应的话题进行可视化
grid map路径搜索
文件结构
├── environment
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-38.pyc
│ │ ├── env.cpython-38.pyc
│ │ ├── node.cpython-38.pyc
│ │ ├── point2d.cpython-38.pyc
│ │ └── pose2d.cpython-38.pyc
│ ├── env.py
│ ├── node.py
│ ├── point2d.py
│ └── pose2d.py
├── a_star.py
├── dijkstra.py
├── global_plan.py
├── graph_search.py
├── planner.py
└── plot.py
我们主要需要运行的是global_plan.py:
python global_plan.py
ps:如果路线没有出来可以拉动一下图片大小。
优化方法求解
进入frenet_planning/FrenetOptimalTrajectory
运行
python frenet_optimal_trajectory.py
Frenet坐标系的分解思想.
传统轨迹规划在全局坐标系(x,y)中直接规划,需要同时考虑位置、方向、速度等多维度约束,复杂度高。
Frenet 算法将运动分解到Frenet 坐标系中,简化问题:
● 纵向(s 方向):沿 “参考路径” 的前进方向,关注 “走多远” 和 “速度快慢”(类似沿公路的里程)。
● 横向(d 方向):垂直于参考路径的偏移方向,关注 “偏离参考路径多远”(类似公路上的左右偏移)。
step 1:生成参考路径(基准线)
首先需要一条 “参考路径” 作为规划的基准(类似公路的中心线)。
● 通过generate_target_course函数,用三次样条曲线(cubic spline) 拟合输入的路点(wx, wy),生成一条平滑的连续路径。
● 这条路径包含位置、航向角、曲率等信息,作为后续规划的基准。
step 2:生成大量候选轨迹
为了找到最优路径,先生成足够多的候选轨迹,通过calc_frenet_paths函数实现:
(1)横向轨迹规划(d 方向)
● 目标:从当前横向位置(c_d)规划到不同的 “目标横向位置”(di,在-MAX_ROAD_WIDTH到MAX_ROAD_WIDTH范围内采样,间隔D_ROAD_W)。
● 方法:使用五次多项式拟合横向轨迹。
五次多项式的优势是可以同时满足 “位置、速度、加速度” 的边界条件,保证轨迹平滑。
(2)纵向轨迹规划(s 方向)
● 目标:从当前纵向位置(s0)和速度(c_speed)规划到不同的 “目标速度”(tv,在当前速度和期望速度附近采样)。
● 方法:使用四次多项式拟合纵向轨迹。
四次多项式可满足 “位置、速度、加速度” 的边界条件,保证纵向运动的平滑性。
(3)组合横向和纵向轨迹
● 对每个横向目标位置(di)、每个预测时间(Ti,在MIN_T到MAX_T范围内采样)、每个纵向目标速度(tv),组合成一条完整的候选轨迹,记录其位置、速度、加速度等信息。
step 3:候选轨迹的可行性检查
生成的候选轨迹可能存在不符合物理约束或安全要求的情况,需要通过check_paths函数筛选:
● 物理约束:
○ 速度不超过MAX_SPEED;
○ 加速度绝对值不超过MAX_ACCEL;
○ 曲率不超过MAX_CURVATURE。
● 安全约束:
通过check_collision函数检查轨迹是否与障碍物碰撞。
step 4:选择最优轨迹
从可行的候选轨迹中,通过代价函数选出 “综合性能最好” 的轨迹:
● 代价函数(cf)综合了以下因素:
○ 横向代价(cd):横向急动度(d_ddd,衡量平滑性)、预测时间(Ti,衡量效率)、横向偏移量(d,衡量与参考路径的偏离);
○ 纵向代价(cv):纵向急动度(s_ddd,衡量平滑性)、预测时间(Ti)、与目标速度的偏差(ds,衡量效率);
○ 总代价cf = K_LATcd + K_LONcv(通过权重K_LAT和K_LON平衡横向和纵向性能)。
● 最终选择代价最小的轨迹作为最优解。
总结来说就是4步:
- 用样条曲线生成平滑参考路径;
- 分解横向 / 纵向运动,用多项式生成大量候选轨迹;
- 筛选出满足物理约束和无碰撞的可行轨迹;
- 用代价函数选择 “最平滑、最高效、最安全” 的最优轨迹。
smach状态机
基于规则的状态转移机制,通过smach和smach_ros库实现
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
简化SMACH状态机设计 - 单机器人任务执行系统
移除复杂传值,简化状态转移
"""import rospy
import smach
import smach_ros
import time
import random
from geometry_msgs.msg import Point
from collections import deque# ==================== 简化数据结构 ====================
class SimpleRobot:"""简化机器人数据结构"""def __init__(self):self.battery = 100.0self.position = Point(0, 0, 0)self.status = 'idle'self.task_count = 0self.max_tasks = 3# ==================== 全局变量 ====================
robot = SimpleRobot()# ==================== 系统初始化状态 ====================
class InitState(smach.State):"""系统初始化状态"""def __init__(self):smach.State.__init__(self, outcomes=['success', 'failed'])def execute(self, userdata):global robotrospy.loginfo('=== 系统初始化 ===')# 重置机器人状态robot.battery = 100.0robot.position = Point(0, 0, 0)robot.status = 'ready'robot.task_count = 0rospy.loginfo(f'机器人初始化完成 - 电量: {robot.battery}%, 位置: ({robot.position.x}, {robot.position.y})')time.sleep(1)return 'success'# ==================== 任务执行状态 ====================
class TaskState(smach.State):"""任务执行状态"""def __init__(self):smach.State.__init__(self, outcomes=['task_done', 'need_charge', 'all_done'])def execute(self, userdata):global robot# 检查是否完成所有任务if robot.task_count >= robot.max_tasks:rospy.loginfo('所有任务已完成!')return 'all_done'# 检查电量if robot.battery < 30:rospy.logwarn('电量不足,需要充电')return 'need_charge'# 执行任务robot.task_count += 1task_time = random.uniform(2, 4)rospy.loginfo(f'开始执行任务 {robot.task_count}/{robot.max_tasks}')for i in range(int(task_time)):if rospy.is_shutdown():return 'task_done'robot.battery -= random.uniform(5, 10)rospy.loginfo(f'任务进行中... 电量: {robot.battery:.1f}%')time.sleep(1)# 更新位置robot.position.x += random.uniform(-2, 2)robot.position.y += random.uniform(-2, 2)rospy.loginfo(f'任务 {robot.task_count} 完成!新位置: ({robot.position.x:.1f}, {robot.position.y:.1f})')return 'task_done'# ==================== 充电状态 ====================
class ChargeState(smach.State):"""充电状态"""def __init__(self):smach.State.__init__(self, outcomes=['charged'])def execute(self, userdata):global robotrospy.loginfo('=== 开始充电 ===')while robot.battery < 90 and not rospy.is_shutdown():robot.battery += 20if robot.battery > 100:robot.battery = 100rospy.loginfo(f'充电中... 电量: {robot.battery:.1f}%')time.sleep(1)rospy.loginfo('充电完成!')return 'charged'# ==================== 休息状态 ====================
class RestState(smach.State):"""休息状态"""def __init__(self):smach.State.__init__(self, outcomes=['rested'])def execute(self, userdata):global robotrospy.loginfo('=== 任务完成,机器人休息 ===')robot.status = 'resting'for i in range(3):if rospy.is_shutdown():breakrospy.loginfo(f'休息中... {i+1}/3')time.sleep(1)rospy.loginfo('休息完成!')return 'rested'# ==================== 结果汇总状态 ====================
class SummaryState(smach.State):"""结果汇总状态"""def __init__(self):smach.State.__init__(self, outcomes=['finished'])def execute(self, userdata):global robotrospy.loginfo('=== 任务执行总结 ===')rospy.loginfo(f'完成任务数: {robot.task_count}/{robot.max_tasks}')rospy.loginfo(f'最终电量: {robot.battery:.1f}%')rospy.loginfo(f'最终位置: ({robot.position.x:.1f}, {robot.position.y:.1f})')rospy.loginfo(f'机器人状态: {robot.status}')rospy.loginfo('=== 状态机执行完成 ===')return 'finished'# ==================== 创建状态机 ====================
def create_state_machine():"""创建简化状态机"""# 创建主状态机sm = smach.StateMachine(outcomes=['SUCCESS', 'FAILED'])with sm:# 系统初始化smach.StateMachine.add('INIT',InitState(),transitions={'success': 'TASK_LOOP','failed': 'FAILED'})# 创建任务循环子状态机task_loop = smach.StateMachine(outcomes=['loop_done', 'loop_failed'])with task_loop:smach.StateMachine.add('TASK_EXECUTION',TaskState(),transitions={'task_done': 'TASK_EXECUTION','need_charge': 'CHARGING','all_done': 'loop_done'})smach.StateMachine.add('CHARGING',ChargeState(),transitions={'charged': 'TASK_EXECUTION'})# 添加任务循环到主状态机smach.StateMachine.add('TASK_LOOP',task_loop,transitions={'loop_done': 'REST','loop_failed': 'FAILED'})# 休息状态smach.StateMachine.add('REST',RestState(),transitions={'rested': 'SUMMARY'})# 结果汇总smach.StateMachine.add('SUMMARY',SummaryState(),transitions={'finished': 'INIT'})return sm# ==================== 主函数 ====================
def main():"""主函数"""# 初始化ROS节点rospy.init_node('simple_state_machine')rospy.loginfo('启动简化状态机...')# 创建状态机sm = create_state_machine()# 创建状态机可视化服务器sis = smach_ros.IntrospectionServer('simple_sm', sm, '/SM_ROOT')sis.start()try:# 执行状态机outcome = sm.execute()rospy.loginfo(f'状态机执行完成,结果: {outcome}')except rospy.ROSInterruptException:rospy.loginfo('状态机被中断')except Exception as e:rospy.logerr(f'状态机执行出错: {str(e)}')finally:sis.stop()rospy.loginfo('状态机服务器已停止')if __name__ == '__main__':main()
栅格地图规划方法
安装依赖项
sudo apt install ros-noetic-costmap-2d ros-noetic-nav-core ros-noetic-navfn ros-noetic-base-local-planner ros-noetic-map-server ros-noetic-amcl ros-noetic-map-server ros-noetic-move-base ros-noetic-smach-viewer
将此内容放到同一个功能包中,然后编译:
catkin_make
地图使用的是2d 栅格地图格式
内置的算法包括astar,dijkstra和ghfs,都是在graph_planner的a_star.cpp实现的,大家如果想要做这个的,可以直接修改这里的a_star的实现,修改为其他算法,然后对比效果,不建议在现有框架下再添加一个规划方法(需要大量的ros,qt的c++开发经验)
/*** @brief Construct a new AStar object* @param nx pixel number in costmap x direction* @param ny pixel number in costmap y direction* @param resolution costmap resolution* @param dijkstra using diksktra implementation* @param gbfs using gbfs implementation*/
AStar::AStar(int nx, int ny, double resolution, bool dijkstra, bool gbfs) : GlobalPlanner(nx, ny, resolution)
{// can not using both dijkstra and GBFS at the same timeif (!(dijkstra && gbfs)){is_dijkstra_ = dijkstra;is_gbfs_ = gbfs;}else{is_dijkstra_ = false;is_gbfs_ = false;}factor_ = 0.25;
};
AStar由dijkstra和gbfs控制,也是使用这三种算法切换的方式,只需要再添加对应的flag开发其他的规划算法即可.