当前位置: 首页 > news >正文

【pytorch】常用强化学习算法实现(持续更新)

持续更新常用的强化学习算法,采用单python文件实现,简单易读

  • 2024.11.09 更新:PPO(GAE); SAC
  • 2024.11.12 更新:OptionCritic(PPOC)
"PPO"
import copy
import time
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as Fimport gymnasium as gym
import matplotlib.pyplot as pltfrom tqdm import trange
from torch.distributions import Normalclass Actor(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.mu = nn.Linear(128, action_size)self.sigma = nn.Linear(128, action_size)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))mu = F.tanh(self.mu(x))sigma = F.softplus(self.sigma(x))return mu, sigmaclass Critic(nn.Module):def __init__(self, state_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.fc3 = nn.Linear(128, 1)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))return self.fc3(x)def ppo_training(trajectory, actor, critic, actor_optimizer, critic_optimizer,clip=0.2, k_epochs=10, gamma=0.99, lam=0.95, device='cpu', T=1e-2):states, actions, log_probs, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)).to(device),zip(*trajectory))rewards = rewards.view(-1, 1)dones = dones.view(-1, 1).int()with torch.no_grad():next_values = critic(next_states.float())td_target = rewards + gamma * next_values * (1 - dones)td_value = critic(states.float())td_delta = td_target - td_valuetd_delta = td_delta.detach().cpu().numpy()adv = 0.0advantages = []for delta in td_delta[::-1]:adv = gamma * lam * adv + deltaadvantages.append(adv)advantages.reverse()advantages = torch.from_numpy(np.array(advantages)).float().to(device)advantages = (advantages - advantages.mean()) / advantages.std()for k in range(k_epochs):mu, sigma = actor(states.float())dist = Normal(mu, sigma)new_log_probs = dist.log_prob(actions)entropy = dist.entropy()ratio = torch.exp(new_log_probs - log_probs.detach())surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1.0 - clip, 1 + clip) * advantagesactor_loss = - torch.min(surr1, surr2).mean() - entropy.mean() * Tcritic_loss = F.mse_loss(critic(states.float()), td_target.float().detach())actor_optimizer.zero_grad()critic_optimizer.zero_grad()actor_loss.backward()actor_optimizer.step()critic_loss.backward()critic_optimizer.step()if __name__ == '__main__':device = torch.device("cpu")env = gym.make('Walker2d')episodes = 1000train_timesteps = 1024clip = 0.2k_epochs = 40gamma = 0.9lam = 0.95T = 1e-2lr = 1e-4actor = Actor(env.observation_space.shape[0], env.action_space.shape[0]).to(device)critic = Critic(env.observation_space.shape[0]).to(device)actor_optimizer = torch.optim.Adam(actor.parameters(), lr=lr)critic_optimizer = torch.optim.Adam(critic.parameters(), lr=lr)trajectory = []timestep = 0pbar = trange(1, episodes+1)score_list = []for e in pbar:state, _ = env.reset()scores = 0.0while True:timestep += 1s = torch.from_numpy(state).float().to(device)mu, sigma = actor(s)dist = Normal(mu, sigma)a = dist.sample()log_prob = dist.log_prob(a).detach().cpu().numpy()action = a.detach().cpu().numpy()next_state, reward, done, _, _ = env.step(action)scores += rewardtrajectory.append([state, action, log_prob, reward, next_state, done])if timestep % train_timesteps == 0:ppo_training(trajectory,actor,critic,actor_optimizer,critic_optimizer,clip,k_epochs,gamma,lam,device,T)trajectory = []state = copy.deepcopy(next_state)if done: breakscore_list.append(scores)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}".format(e, episodes, scores, timestep))
"SAC"
from torch.distributions import Normal
from collections import deque
from tqdm import trangeimport torch
import torch.nn as nn
import torch.nn.functional as Fimport copy
import time
import random
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as pltclass ActorNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size, 256)self.fc2 = nn.Linear(256, 128)self.mu = nn.Linear(128, action_size)self.sigma = nn.Linear(128, action_size)def forward(self, x):x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))mu = self.mu(x)sigma = F.softplus(self.sigma(x))return mu, sigmaclass QNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc1 = nn.Linear(state_size + action_size, 256)self.fc2 = nn.Linear(256, 128)self.fc3 = nn.Linear(128, 1)def forward(self, s, a):x = torch.cat((s, a), dim=-1)x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))return self.fc3(x)class ReplayBuffer:def __init__(self, capacity):self.memory = deque(maxlen=capacity)def __len__(self):return len(self.memory)def save_memory(self, state, action, reward, next_state, done):self.memory.append([state, action, reward, next_state, done])def sample(self, batch_size):sample_size = min(len(self), batch_size)experiences = random.sample(self.memory, sample_size)return experiencesdef soft_update(target, source, tau=0.05):for param, target_param in zip(source.parameters(), target.parameters()):target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)def choice_action(actor, state):mu, sigma = actor(state)normal_dist = Normal(torch.zeros_like(mu), torch.ones_like(sigma))epsilon = normal_dist.sample()action = torch.tanh(mu + sigma * epsilon)log_prob = normal_dist.log_prob(epsilon)log_prob -= torch.log(1 - action.pow(2) + 1e-6)log_prob = log_prob.sum(-1, keepdim=True)return action, log_probdef training(gamma, replay_buffer, models, log_alpha, target_entropy, optimizers, batch_size, tau):(actor,q1_net,target_q1_net,q2_net,target_q2_net) = models(actor_optimizer,q1_optimizer,q2_optimizer,alpha_optimizer) = optimizersbatch_data = replay_buffer.sample(batch_size)states, actions, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)).float().to(device),zip(*batch_data))with torch.no_grad():alpha = torch.exp(log_alpha)with torch.no_grad():next_state_actions, next_state_log_probs = choice_action(actor, next_states)target_q1_next = target_q1_net(next_states, next_state_actions)target_q2_next = target_q2_net(next_states, next_state_actions)min_q_next_target = torch.min(target_q1_next, target_q2_next) - alpha * next_state_log_probstd_target_value = rewards.view(-1, 1) + (1 - dones.view(-1, 1)) * gamma * min_q_next_targetq1 = q1_net(states, actions)q2 = q2_net(states, actions)q1_loss = F.mse_loss(q1, td_target_value)q2_loss = F.mse_loss(q2, td_target_value)q1_optimizer.zero_grad()q2_optimizer.zero_grad()q1_loss.backward()q2_loss.backward()q1_optimizer.step()q2_optimizer.step()state_actions, state_log_probs = choice_action(actor, states)q = torch.min(q1_net(states, state_actions), q2_net(states, state_actions))actor_loss = torch.mean((alpha * state_log_probs) - q)actor_optimizer.zero_grad()actor_loss.backward()actor_optimizer.step()with torch.no_grad():_, log_prob = choice_action(actor, states)alpha_loss = torch.mean(- log_alpha.exp() * (log_prob + target_entropy))alpha_optimizer.zero_grad()alpha_loss.backward()alpha_optimizer.step()soft_update(target_q1_net, q1_net, tau)soft_update(target_q2_net, q2_net, tau)if __name__ == '__main__':device = torch.device("cpu")env = gym.make('Walker2d')episodes = 1000train_timesteps = 4policy_lr = 1e-4q_lr = 1e-4alpha_lr = 1e-2tau = 0.05buffer_capacity = int(1e6)batch_size = 64gamma = 0.9state_size = env.observation_space.shape[0]action_size = env.action_space.shape[0]target_entropy = - torch.prod(torch.tensor(env.observation_space.shape, device=device))actor = ActorNetwork(state_size, action_size).to(device)q1_net = QNetwork(state_size, action_size).to(device)target_q1_net = QNetwork(state_size, action_size).to(device)q2_net = QNetwork(state_size, action_size).to(device)target_q2_net = QNetwork(state_size, action_size).to(device)target_q1_net.load_state_dict(q1_net.state_dict())target_q2_net.load_state_dict(q2_net.state_dict())log_alpha = torch.tensor(0.0, requires_grad=True, device=device)actor_optimizer = torch.optim.Adam(actor.parameters(), lr=policy_lr)q1_optimizer = torch.optim.Adam(q1_net.parameters(), lr=q_lr)q2_optimizer = torch.optim.Adam(q2_net.parameters(), lr=q_lr)alpha_optimizer = torch.optim.Adam([log_alpha], lr=alpha_lr)replay_buffer = ReplayBuffer(buffer_capacity)pbar = trange(1, episodes+1)timestep = 0score_list = []for episode in pbar:state, _ = env.reset()scores = 0.0while True:timestep += 1if timestep % train_timesteps == 0:training(gamma,replay_buffer,(actor,q1_net,target_q1_net,q2_net,target_q2_net),log_alpha,target_entropy,(actor_optimizer,q1_optimizer,q2_optimizer,alpha_optimizer),batch_size,tau)action, _ = choice_action(actor, torch.from_numpy(state).float().to(device))action = action.detach().cpu().numpy()next_state, reward, done, _, _ = env.step(action)scores += rewardreplay_buffer.save_memory(state, action, reward, next_state, done)state = copy.deepcopy(next_state)if done: breakscore_list.append(scores)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}, Log Alpha: {:.2f}".format(episode, episodes, scores, timestep, log_alpha.item()))
"OptionCritic(PPOC)"import torch
import torch.nn as nn
import torch.nn.functional as Ffrom torch.distributions import Bernoulli, Normal
from torch import optimfrom tqdm import trangeimport matplotlib.pyplot as plt
import gymnasium as gym
import numpy as np
import random
import copyclass QNetwork(nn.Module):def __init__(self, state_size, num_options):super().__init__()self.nn = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),nn.ReLU(),nn.Linear(128, num_options))def forward(self, x):return self.nn(x)class ActorNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.fc = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),)self.mu = nn.Sequential(nn.ReLU(),nn.Linear(128, action_size),nn.Tanh())self.sigma = nn.Sequential(nn.ReLU(),nn.Linear(128, action_size),nn.Softplus())def forward(self, x):x = self.fc(x)return self.mu(x), self.sigma(x)class TerminationNetwork(nn.Module):def __init__(self, state_size, num_options):super().__init__()self.nn = nn.Sequential(nn.Linear(state_size, 256),nn.ReLU(),nn.Linear(256, 128),nn.ReLU(),nn.Linear(128, num_options),nn.Sigmoid())def forward(self, x):return self.nn(x)class OptionCritic(nn.Module):def __init__(self, state_size, action_size, num_options):super().__init__()self.upper_policy_q_net = QNetwork(state_size, num_options)self.termination_network = TerminationNetwork(state_size, num_options)self.options = nn.ModuleList([ActorNetwork(state_size, action_size)for _ in range(num_options)])self.num_options = num_optionsdef get_option_id(self, state, epsilon):if np.random.rand() > epsilon:return torch.argmax(self.upper_policy_q_net(state),dim=-1).detach().cpu().numpy().item()else:return random.sample(range(self.num_options), 1)[0]def is_option_terminated(self, state, option_id):option_termination_prob = self.termination_network(state)[option_id]option_termination = Bernoulli(option_termination_prob).sample()return bool(option_termination.item())def select_action(self, state, epsilon, option_id):if self.is_option_terminated(state, option_id):option_id = self.get_option_id(state, epsilon)else: option_id = option_idmu, sigma = self.options[option_id](state)normal_dist = Normal(mu, sigma)action = normal_dist.sample()log_prob = normal_dist.log_prob(action)action = action.detach().cpu().numpy()log_prob = log_prob.detach().cpu().numpy()return action, log_prob, option_iddef training(agent, optimizer, trajectory, gamma, k_epochs, clip, lam, T):states, actions, log_probs, option_id, rewards, next_states, dones = map(lambda x: torch.from_numpy(np.array(x)), zip(*trajectory))option_id = option_id.view(-1, 1)rewards = rewards.view(-1, 1)dones = dones.view(-1, 1).float()with torch.no_grad():option_terminated_prob = agent.termination_network(next_states.float()).gather(-1, option_id)next_q = agent.upper_policy_q_net(next_states.float())q_target = rewards + gamma * (1 - dones) * ((1 - option_terminated_prob) * next_q.gather(-1, option_id)+ option_terminated_prob * next_q.max(dim=-1, keepdim=True)[0])td_delta = q_target - agent.upper_policy_q_net(states.float()).gather(-1, option_id)td_delta = td_delta.detach().cpu().numpy()adv = 0.0advantages = []for delta in td_delta[::-1]:adv = gamma * lam * adv + deltaadvantages.append(adv)advantages.reverse()advantages = torch.from_numpy(np.array(advantages)).float()advantages = ((advantages - advantages.mean())/ (1e-6 + advantages.std()))for k in range(k_epochs):mus, sigmas = [], []for i in range(states.shape[0]):mu, sigma = agent.options[option_id[i]](states[i].float())mus.append(mu), sigmas.append(sigma)mu = torch.stack(mus, 0)sigma = torch.stack(sigmas, 0)normal_dist = Normal(mu, sigma)new_log_probs = normal_dist.log_prob(actions)entropy = normal_dist.entropy()ratio = torch.exp(new_log_probs - log_probs.detach())surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1.0 - clip, 1 + clip) * advantagespolicy_loss = - torch.min(surr1, surr2).mean() - entropy.mean() * Tcritic_loss = F.mse_loss(agent.upper_policy_q_net(states.float()).gather(-1, option_id),q_target.float())termination_loss = agent.termination_network(states.float()).gather(-1, option_id) * (agent.upper_policy_q_net(states.float()).gather(-1, option_id)- agent.upper_policy_q_net(states.float()).max(dim=-1, keepdim=True)[0]).detach()losses = policy_loss + critic_loss + termination_loss.mean()optimizer.zero_grad()losses.backward()optimizer.step()if __name__ == '__main__':env = gym.make('Walker2d')episodes = 1000train_timesteps = 1024clip = 0.2k_epochs = 10gamma = 0.9lam = 0.95T = 1e-2lr = 1e-4epsilon = 1.0epsilon_decay = 0.995mini_epsilon = 0.1state_size = env.observation_space.shape[0]action_size = env.action_space.shape[0]num_options = 4agent = OptionCritic(state_size, action_size, num_options)optimizer = optim.Adam(agent.parameters(), lr=lr)trajectory = []timestep = 0pbar = trange(1, episodes + 1)scores_list = []for e in pbar:state, _ = env.reset()scores = 0.0option_id = agent.get_option_id(torch.from_numpy(state).float(), epsilon)options = [option_id]while True:timestep += 1if timestep % train_timesteps == 0:training(agent, optimizer, trajectory, gamma, k_epochs, clip, lam, T)trajectory = []action, log_prob, option_id = agent.select_action(torch.from_numpy(state).float(), epsilon, option_id)options.append(option_id)next_state, reward, done, _, _ = env.step(action)scores += rewardtrajectory.append([state, action, log_prob, option_id, reward, next_state, done])state = copy.deepcopy(next_state)if done: breakscores_list.append(scores)epsilon = max(mini_epsilon, epsilon * epsilon_decay)pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}, Epsilon: {:.2f}".format(e, episodes, scores, timestep, epsilon))plt.plot(scores_list)plt.show()
http://www.lryc.cn/news/483520.html

相关文章:

  • DAY59||并查集理论基础 |寻找存在的路径
  • Mybatis执行自定义SQL并使用PageHelper进行分页
  • OpenCV DNN
  • 什么时候需要复写hashcode()和compartTo方法
  • PostgreSQL 日志文件备份
  • 2023年MathorCup数学建模B题城市轨道交通列车时刻表优化问题解题全过程文档加程序
  • 数字农业产业链整体建设方案
  • awk那些事儿:在awk中使用shell变量的两种方式
  • 大数据面试题--kafka夺命连环问(后10问)
  • 智能量化交易的多样化策略与风险控制:中阳模型的应用与发展
  • 小皮PHP连接数据库提示could not find driver
  • 2024.11.13(一维数组相关)
  • 豆包MarsCode算法题:数组元素之和最小化
  • Hbase Shell
  • 激活函数解析:神经网络背后的“驱动力”
  • 【开源免费】基于SpringBoot+Vue.JS水果购物网站(JAVA毕业设计)
  • 推荐一款多物理场模拟仿真软件:STAR-CCM+
  • React Hooks在现代前端开发中的应用
  • 重学SpringBoot3-整合Quartz定时任务
  • STM32单片机WIFI语音识别智能衣柜除湿消毒照明
  • spring中entity的作用
  • 2019年下半年试题二:论软件系统架构评估及其应用
  • 网络自动化04:python实现ACL匹配信息(主机与主机信息)
  • 字典树介绍以及C++实现
  • 【C++】用红黑树封装set和map
  • 【大数据测试HDFS + Flask详细教程与实例】
  • 高级java每日一道面试题-2024年10月31日-RabbitMQ篇-RabbitMQ中vhost的作用是什么?
  • 【日常记录-Java】代码配置Logback
  • HTTP常见的请求头有哪些?都有什么作用?在 Web 应用中使用这些请求头?
  • 电信数据清洗案例:利用MapReduce实现高效数据预处理