当前位置: 首页 > news >正文

pytorch基于ray和accelerate实现多GPU数据并行的模型加速训练

在pytorch的DDP原生代码使用的基础上,ray和accelerate两个库对于pytorch并行训练的代码使用做了更加友好的封装。

以下为极简的代码示例。

ray

ray.py

#coding=utf-8
import os
import sys
import time
import numpy as np
import torch
from torch import nn
import torch.utils.data as Data
import ray
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
import onnxruntime# bellow code use AI model to simulate linear regression, formula is: y = x1 * w1 + x2 * w2 + b
# --- DDP RAY --- # # model structure
class LinearNet(nn.Module):def __init__(self, n_feature):super(LinearNet, self).__init__()self.linear = nn.Linear(n_feature, 1)def forward(self, x):y = self.linear(x)return y# whole train task
def train_task():print("--- train_task, pid: ", os.getpid())# device settingdevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")print("device:", device)device_ids = torch._utils._get_all_device_indices()print("device_ids:", device_ids)if len(device_ids) <= 0:print("invalid device_ids, exit")return# prepare datanum_inputs = 2num_examples = 1000true_w = [2, -3.5]true_b = 3.7features = torch.tensor(np.random.normal(0, 1, (num_examples, num_inputs)), dtype=torch.float)labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b + torch.tensor(np.random.normal(0, 0.01, size=num_examples), dtype=torch.float)# load databatch_size = 10dataset = Data.TensorDataset(features, labels)data_iter = Data.DataLoader(dataset, batch_size, shuffle=True)for X, y in data_iter:print(X, y)breakdata_iter = ray.train.torch.prepare_data_loader(data_iter)# model define and initmodel = LinearNet(num_inputs)ddp_model = ray.train.torch.prepare_model(model)print(ddp_model)# cost functionloss = nn.MSELoss()# optimizeroptimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.03)# trainnum_epochs = 6for epoch in range(1, num_epochs + 1):batch_count = 0sum_loss = 0.0for X, y in data_iter:output = ddp_model(X)l = loss(output, y.view(-1, 1))optimizer.zero_grad()l.backward()optimizer.step()batch_count += 1sum_loss += l.item()print('epoch %d, avg_loss: %f' % (epoch, sum_loss / batch_count))# save modelprint("save model, pid: ", os.getpid())torch.save(ddp_model.module.state_dict(), "ddp_ray_model.pt")def ray_launch_task():num_workers = 2scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=True)trainer = TorchTrainer(train_loop_per_worker=train_task, scaling_config=scaling_config)results = trainer.fit()def predict_task():print("--- predict_task")# prepare datanum_inputs = 2num_examples = 20true_w = [2, -3.5]true_b = 3.7features = torch.tensor(np.random.normal(0, 1, (num_examples, num_inputs)), dtype=torch.float)labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b + torch.tensor(np.random.normal(0, 0.01, size=num_examples), dtype=torch.float)model = LinearNet(num_inputs)model.load_state_dict(torch.load("ddp_ray_model.pt"))model.eval()x, y = features[6], labels[6]pred_y = model(x)print("x:", x)print("y:", y)print("pred_y:", y)if __name__ == "__main__":print("==== task begin ====")print("python version:", sys.version)print("torch version:", torch.__version__)print("model name:", LinearNet.__name__)ray_launch_task()# predict_task()print("==== task end ====")

accelerate

acc.py

#coding=utf-8
import os
import sys
import time
import numpy as np
from accelerate import Accelerator
import torch
from torch import nn
import torch.utils.data as Data
import onnxruntime# bellow code use AI model to simulate linear regression, formula is: y = x1 * w1 + x2 * w2 + b
# --- accelerate --- # # model structure
class LinearNet(nn.Module):def __init__(self, n_feature):super(LinearNet, self).__init__()self.linear = nn.Linear(n_feature, 1)def forward(self, x):y = self.linear(x)return y# whole train task
def train_task():print("--- train_task, pid: ", os.getpid())# device settingdevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")print("device:", device)device_ids = torch._utils._get_all_device_indices()print("device_ids:", device_ids)if len(device_ids) <= 0:print("invalid device_ids, exit")return# prepare datanum_inputs = 2num_examples = 1000true_w = [2, -3.5]true_b = 3.7features = torch.tensor(np.random.normal(0, 1, (num_examples, num_inputs)), dtype=torch.float)labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b + torch.tensor(np.random.normal(0, 0.01, size=num_examples), dtype=torch.float)# load databatch_size = 10dataset = Data.TensorDataset(features, labels)data_iter = Data.DataLoader(dataset, batch_size, shuffle=True)for X, y in data_iter:print(X, y)break# model define and initmodel = LinearNet(num_inputs)# cost functionloss = nn.MSELoss()# optimizeroptimizer = torch.optim.SGD(model.parameters(), lr=0.03)accelerator = Accelerator()model, optimizer, data_iter = accelerator.prepare(model, optimizer, data_iter) # automatically move model and data to gpu as config# trainnum_epochs = 3for epoch in range(1, num_epochs + 1):batch_count = 0sum_loss = 0.0for X, y in data_iter:output = model(X)l = loss(output, y.view(-1, 1))optimizer.zero_grad()accelerator.backward(l)optimizer.step()batch_count += 1sum_loss += l.item()print('epoch %d, avg_loss: %f' % (epoch, sum_loss / batch_count))# save modeltorch.save(model, "acc_model.pt")def predict_task():print("--- predict_task")# prepare datanum_inputs = 2num_examples = 20true_w = [2, -3.5]true_b = 3.7features = torch.tensor(np.random.normal(0, 1, (num_examples, num_inputs)), dtype=torch.float)labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b + torch.tensor(np.random.normal(0, 0.01, size=num_examples), dtype=torch.float)model = torch.load("acc_model.pt")model.eval()x, y = features[6], labels[6]pred_y = model(x)print("x:", x)print("y:", y)print("pred_y:", y)if __name__ == "__main__":# launch method: use command line# for example# accelerate launch ACC.py print("python version:", sys.version)print("torch version:", torch.__version__)print("model name:", LinearNet.__name__)train_task()predict_task()print("==== task end ====")
http://www.lryc.cn/news/137800.html

相关文章:

  • [蓝帽杯 2022 初赛]domainhacker
  • 在 Pytorch 中使用 TensorBoard
  • Grafana Dashboard 备份方案
  • opencv-疲劳检测-眨眼检测
  • 2023-08-24力扣每日一题
  • 蚂蚁数科持续发力PaaS领域,SOFAStack布局全栈软件供应链安全产品
  • Java后端开发面试题——消息中间篇
  • C++ Windows API IsDebuggerPresent的作用
  • 【JVM 内存结构 | 程序计数器】
  • 华为云Stack的学习(一)
  • 人类反馈强化学习RLHF;微软应用商店推出AI摘要功能
  • day1:前端缓存问题
  • 学习网络编程No.4【socket编程实战】
  • HarmonyOS学习路之方舟开发框架—学习ArkTS语言(状态管理 四)
  • arcgis--坐标系
  • LFS学习系列 第5章. 编译交叉工具链(1)
  • 网络互联与互联网 - TCP 协议详解
  • 开源在线图片设计器,支持PSD解析、AI抠图等,基于Puppeteer生成图片
  • 在Linux系统上安装和配置Redis数据库,无需公网IP即可实现远程连接的详细解析
  • 跨平台图表:ChartDirector for .NET 7.1 Crack
  • 【unity数据持久化】XML数据管理器知识点
  • Linux——Shell常用运算符
  • C++(4)C++内存管理和命名空间
  • 一网打尽java注解-克隆-面向对象设计原则-设计模式
  • k8s-statefulset部署myql-Nodeport方式
  • MySQL双主架构、主从架构
  • 基于微信小程序的物流管理系统3txar
  • Maven 一键部署到 SSH 服务器
  • docker搭建owncloud,Harbor,构建镜像
  • RISC-V(1)——RISC-V是什么,有什么用