当前位置: 首页 > news >正文

Pytorch自定义算子反向传播

文章目录

      • 自定义一个线性函数算子
      • 如何实现反向传播

有关 自定义算子的实现前面已经提到,可以参考。本文讲述自定义算子如何前向推理+反向传播进行模型训练。

自定义一个线性函数算子

线性函数 Y = X W T + B Y = XW^T + B Y=XWT+B 定义输入M 个X变量,输出N个Y变量的线性方程组。
X X X 为一个 1 x M 矩阵, W W W为 N x M 矩阵, B B B 为 1xN 矩阵,根据公式,输出 Y Y Y为1xN 矩阵。其中 W 和 B 为算子权重参数,保存在模型中。
在训练时刻,模型输入 X X X , 和监督值 Y Y Y,根据 算子forward()计算的 Y p Y^p Yp ,计算Loss = criterion( Y Y Y, Y p Y^p Yp ),然后根据backward()链式求导反向传播计算梯度值。最后根据梯度更新W 和 B 参数。

class LinearF(torch.autograd.Function):@staticmethoddef symbolic(g, input, weight, bias):return g.op("MYLINEAR", input, weight, bias)@staticmethoddef forward(ctx, input:Tensor, weight: Tensor, bias: Tensor) -> Tensor:output = input @ weight.T + bias[None, ...]ctx.save_for_backward(input, weight)return output@staticmethoddef backward(ctx, grad_output:Tensor)->Tuple[Tensor, Tensor, Tensor]:# grad_output -- [B, N] = d(Loss) / d(Y)input, weight = ctx.saved_tensorsgrad_input = grad_output @ weightgrad_weight = grad_output.T @ inputgrad_bias = grad_output.sum(0)# print("grad_input: ", grad_input)# print("grad_weight: ", grad_weight)# print("grad_bias: ", grad_bias)return grad_input, grad_weight, grad_bias

如何实现反向传播

在这里插入图片描述
前向推理比较简单,就根据公式来既可以。反向传播backward() 怎么写呢?
反向传播有两个输入参数,第一个为ctx,第二个grad_output,grad_output就是对forward() 输出output 的求导,如果是最后的节点,那就是loss对输出的求导,否则就是下一层对输出求导,输出grad_input, grad_weight, grad_bias则分别对应了forward的输入input、weight、bias的梯度。这很容易理解,因为是在做链式求导,LinearFunction是这条链上的某个节点,输入输出的数量和含义刚好相反且对应。
根据公式:
Y = X W T + B Y = XW^T + B Y=XWT+B
Loss = criterion( Y t Y^t_{} Yt, Y Y_{} Y ), 假设我们选择判别函数为L2范数,Loss = ∑ j = 0 N 0.5 ∗ ( Y j t − Y j ) 2 \sum_{j=0}^N0.5 * (Y^t_{j}-Y_{j} )^2 j=0N0.5(YjtYj)2

grad_output(j) = d ( L o s s ) d ( Y j ) \frac{d(Loss) }{d(Y_{j})} d(Yj)d(Loss) = Y j t − Y j Y^t_{j} - Y_{j} YjtYj

其中 Y j t Y^t_{j} Yjt为监督值, Y j Y_{j} Yj为模型输出值。

根据链式求导法则, 对输入 X i X_{i} Xi 的求导为

grad_input[i] = ∑ j = 0 N d ( L o s s ) d ( Y j ) ∗ d ( Y j ) d ( X i ) \sum_{j=0}^N\frac{d(Loss) }{d(Y_{j})}*\frac{d(Y_{j}) }{d(X_{i})} j=0Nd(Yj)d(Loss)d(Xi)d(Yj)= ∑ j = 0 N g r a d _ o u t p u t [ j ] ∗ d ( Y j ) d ( X i ) \sum_{j=0}^N{grad\_output}[j] *\frac{d(Y_{j}) }{d(X_{i})} j=0Ngrad_output[j]d(Xi)d(Yj)

d ( Y j ) d ( X i ) \frac{d(Y_{j}) }{d(X_{i})} d(Xi)d(Yj) 即为 W i j T = W j i W^T_{ij} = W_{ji} WijT=Wji

其中i 对应X维度, j对应输出Y维度。

最后整理成矩阵形式:

g r a d _ i n p u t = g r a d _ o u t p u t ∗ W {grad\_input}={grad\_output} * W grad_input=grad_outputW

同理:
g r a d _ w e i g h t = g r a d _ o u t p u t T ∗ X {grad\_weight}={grad\_output}^T * X grad_weight=grad_outputTX

g r a d _ b i a s = ∑ q = 0 N g r a d _ o u t p u t {grad\_bias}=\sum_{q=0}^N{grad\_output} grad_bias=q=0Ngrad_output

最后根据公式形式得到backward()函数。

反向传播的梯度求解还是不容易的,一不小心可能算错了,所以务必在模型训练以前检查梯度计算的正确性。pytorch提供了torch.autograd.gradcheck方法来检验梯度计算的正确性。

其他参考文献:pytorch自定义算子实现详解及反向传播梯度推导

最后根据自定义算子,搭建模型,训练模型参数W,B。并导出onnx。参考代码如下:

import torch
from torch import Tensor
from typing import Tuple
import numpy as np
class LinearF(torch.autograd.Function):@staticmethoddef symbolic(g, input, weight, bias):return g.op("MYLINEAR", input, weight, bias)@staticmethoddef forward(ctx, input:Tensor, weight: Tensor, bias: Tensor) -> Tensor:output = input @ weight.T + bias[None, ...]ctx.save_for_backward(input, weight)return output@staticmethoddef backward(ctx, grad_output:Tensor)->Tuple[Tensor, Tensor, Tensor]:print("grad_output: ", grad_output)# grad_output -- [B, N] = d(Loss) / d(Y)input, weight = ctx.saved_tensorsgrad_input = grad_output @ weightgrad_weight = grad_output.T @ inputgrad_bias = grad_output.sum(0)return grad_input, grad_weight, grad_bias#对LinearFunction进行封装
class MyLinear(torch.nn.Module):def __init__(self, in_features: int, out_features: int, dtype:torch.dtype) -> None:super().__init__()self.in_features = in_featuresself.out_features = out_featuresself.weight = torch.nn.Parameter(torch.empty((out_features, in_features), dtype=dtype))self.bias = torch.nn.Parameter(torch.empty((out_features,), dtype=dtype))self.reset_parameters()# self.weight = torch.nn.Parameter(torch.Tensor([2.0, 3.0]))# self.bias = torch.nn.Parameter(torch.Tensor([4.0]))#y = 2 * x1 + 3 * x2 + 4def reset_parameters(self) -> None:torch.nn.init.uniform_(self.weight)torch.nn.init.uniform_(self.bias)def forward(self, input: Tensor) -> Tensor:# for name, pa in self.named_parameters():#     print(name, pa)return LinearF.apply(input, self.weight, self.bias)  # 在此处使用if __name__ == "__main__":device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')print(device.type)model = MyLinear(2, 1, dtype=torch.float64).to(device)# torch.Tensor 默认类型为float32,使用gpu时,输入数据类型与W权重类型一致,否则报错# torch.Tensor([3.0, 2.0].double() 转换为float64#input = torch.Tensor([3.0, 2.0], ).requires_grad_(True).unsqueeze(0).double()#input = input.to(device)#assert torch.autograd.gradcheck(model, input)import torch.optim as optim#定义优化策略和判别函数optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)criterion = torch.nn.MSELoss()for epoch in range(300):print("************** epoch: ", epoch , " ************************************* ")inputx = torch.Tensor(np.random.rand(2)).unsqueeze(0).double().to(device)lable = torch.Tensor(2 * inputx[:, 0] + 3 * inputx[:, 1] + 4).double().to(device)print("outlable", lable)optimizer.zero_grad()  # 梯度清零prob = model(inputx)print("prob", prob)loss = criterion(lable, prob)print("loss: ", loss)loss.backward()  #反向传播optimizer.step() #更新参数# 完成训练model.cpu().eval()input = torch.tensor([[3.0, 2.0]], dtype=torch.float64)output = model(input)torch.onnx.export(model,  # 这里的args,是指输入给model的参数,需要传递tuple,因此用括号(input,),"linear.onnx",  # 储存的文件路径verbose=True,  # 打印详细信息input_names=["x"],  #为输入和输出节点指定名称,方便后面查看或者操作output_names=["y"],opset_version=11,  #这里的opset,指,各类算子以何种方式导出,对应于symbolic_opset11dynamic_axes={"image": {0: "batch"},"output": {0: "batch"},},operator_export_type=torch.onnx.OperatorExportTypes.ONNX_ATEN_FALLBACK)
http://www.lryc.cn/news/489080.html

相关文章:

  • aws服务(二)机密数据存储
  • VMware Workstation 17.6.1
  • 高校企业数据挖掘平台推荐
  • Vue项目开发 formatData 函数有哪些常用的场景?
  • 【AI知识】两类最主流AI应用(文生图、ChatGPT)中的目标函数
  • 【单片机基础】定时器/计数器的工作原理
  • ModuleNotFoundError: No module named ‘distutils.msvccompiler‘ 报错的解决
  • HCIA笔记2--ARP+ICMP+VRP基础
  • SpringBoot与MongoDB深度整合及应用案例
  • Redis模拟延时队列 实现日程提醒
  • vue项目中富文本编辑器的实现
  • nginx 配置lua执行shell脚本
  • Keil+VSCode优化开发体验
  • vue2中引入cesium全步骤
  • 工程师 - 智能家居方案介绍
  • 中小企业人事管理:SpringBoot框架高级应用
  • 嵌入式Linux驱动开发日记
  • 迪杰特斯拉算法(Dijkstra‘s)
  • reids基础
  • 私有化部署视频平台EasyCVR宇视设备视频平台如何构建视频联网平台及升级视频转码业务?
  • SparkContext讲解
  • MODBUS TCP转CANOpen网关
  • 渗透测试---shell(4)脚本与用户交互以及if条件判断
  • 02_Spring_IoC实现
  • 使用Python3实现Gitee码云自动化发布
  • Ubuntu24.04下的docker问题
  • PAT (Basic Level) Practice (中文)1002 写出这个数
  • C07.L07.STL之映射.应用2.统计数字
  • 微信小程序组件详解:text 和 rich-text 组件的基本用法
  • 算法.图论-习题全集(Updating)