当前位置: 首页 > news >正文

深度学习DNN实战

导包: 

import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
from tqdm.auto import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as Fprint(sys.version_info)
for module in mpl, np, pd, sklearn, torch:print(module.__name__, module.__version__)device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print(device)

加载数据: 

from torchvision import datasets
from torchvision.transforms import ToTensor# fashion_mnist图像分类数据集
train_ds = datasets.FashionMNIST(root="data",train=True,download=True,transform=ToTensor()
)test_ds = datasets.FashionMNIST(root="data",train=False,download=True,transform=ToTensor()
)# torchvision 数据集里没有提供训练集和验证集的划分
# 当然也可以用 torch.utils.data.Dataset 实现人为划分
# 从数据集到dataloader
train_loader = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
val_loader = torch.utils.data.DataLoader(test_ds, batch_size=16, shuffle=False)

获得标准化的东西:

from torchvision.transforms import Normalize# 遍历train_ds得到每张图片,计算每个通道的均值和方差
def cal_mean_std(ds):mean = 0.std = 0.for img, _ in ds: # 遍历每张图片,img.shape=[1,28,28]mean += img.mean(dim=(1, 2))std += img.std(dim=(1, 2))mean /= len(ds)std /= len(ds)return mean, stdprint(cal_mean_std(train_ds))
# 0.2860, 0.3205
transforms = nn.Sequential(Normalize([0.2860], [0.3205]) # 这里的均值和标准差是通过train_ds计算得到的
)

定义模型:

Sequential容器中写输入层,在下面的add_moudle中可以添加多个隐藏层,更深层次的网络能够学到更多的特征;

init_weights改变分布,也是一种调参数的方式

class NeuralNetwork(nn.Module):def __init__(self, layers_num=2):super().__init__()self.transforms = transforms # 预处理层,标准化self.flatten = nn.Flatten()# 多加几层self.linear_relu_stack = nn.Sequential(nn.Linear(28 * 28, 100),nn.ReLU(),)# 加19层for i in range(1, layers_num):self.linear_relu_stack.add_module(f"Linear_{i}", nn.Linear(100, 100))self.linear_relu_stack.add_module(f"relu", nn.ReLU())# 输出层self.linear_relu_stack.add_module("Output Layer", nn.Linear(100, 10))# 初始化权重self.init_weights()def init_weights(self):"""使用 xavier 均匀分布来初始化全连接层的权重 W"""# print('''初始化权重''')for m in self.modules():# print(m)# print('-'*50)if isinstance(m, nn.Linear):#判断m是否为全连接层# https://pytorch.org/docs/stable/nn.init.htmlnn.init.xavier_uniform_(m.weight) # xavier 均匀分布初始化权重nn.init.zeros_(m.bias) # 全零初始化偏置项# print('''初始化权重完成''')def forward(self, x):# x.shape [batch size, 1, 28, 28]x = self.transforms(x) #标准化x = self.flatten(x)  # 展平后 x.shape [batch size, 28 * 28]logits = self.linear_relu_stack(x)# logits.shape [batch size, 10]return logits
total=0
for idx, (key, value) in enumerate(NeuralNetwork(20).named_parameters()):# print(f"Linear_{idx // 2:>02}\tparamerters num: {np.prod(value.shape)}") #np.prod是计算张量的元素个数# print(f"Linear_{idx // 2:>02}\tshape: {value.shape}")total+=np.prod(value.shape)
total #模型参数数量

 训练:

from sklearn.metrics import accuracy_score@torch.no_grad()
def evaluating(model, dataloader, loss_fct):loss_list = []pred_list = []label_list = []for datas, labels in dataloader:datas = datas.to(device)labels = labels.to(device)# 前向计算logits = model(datas)loss = loss_fct(logits, labels)         # 验证集损失loss_list.append(loss.item())preds = logits.argmax(axis=-1)    # 验证集预测pred_list.extend(preds.cpu().numpy().tolist())label_list.extend(labels.cpu().numpy().tolist())acc = accuracy_score(label_list, pred_list)return np.mean(loss_list), acc#%%
from torch.utils.tensorboard import SummaryWriterclass TensorBoardCallback:def __init__(self, log_dir, flush_secs=10):"""Args:log_dir (str): dir to write log.flush_secs (int, optional): write to dsk each flush_secs seconds. Defaults to 10."""self.writer = SummaryWriter(log_dir=log_dir, flush_secs=flush_secs)def draw_model(self, model, input_shape):self.writer.add_graph(model, input_to_model=torch.randn(input_shape))def add_loss_scalars(self, step, loss, val_loss):self.writer.add_scalars(main_tag="training/loss", tag_scalar_dict={"loss": loss, "val_loss": val_loss},global_step=step,)def add_acc_scalars(self, step, acc, val_acc):self.writer.add_scalars(main_tag="training/accuracy",tag_scalar_dict={"accuracy": acc, "val_accuracy": val_acc},global_step=step,)def add_lr_scalars(self, step, learning_rate):self.writer.add_scalars(main_tag="training/learning_rate",tag_scalar_dict={"learning_rate": learning_rate},global_step=step,)def __call__(self, step, **kwargs):# add lossloss = kwargs.pop("loss", None)val_loss = kwargs.pop("val_loss", None)if loss is not None and val_loss is not None:self.add_loss_scalars(step, loss, val_loss)# add accacc = kwargs.pop("acc", None)val_acc = kwargs.pop("val_acc", None)if acc is not None and val_acc is not None:self.add_acc_scalars(step, acc, val_acc)# add lrlearning_rate = kwargs.pop("lr", None)if learning_rate is not None:self.add_lr_scalars(step, learning_rate)#%%
class SaveCheckpointsCallback:def __init__(self, save_dir, save_step=5000, save_best_only=True):"""Save checkpoints each save_epoch epoch. We save checkpoint by epoch in this implementation.Usually, training scripts with pytorch evaluating model and save checkpoint by step.Args:save_dir (str): dir to save checkpointsave_epoch (int, optional): the frequency to save checkpoint. Defaults to 1.save_best_only (bool, optional): If True, only save the best model or save each model at every epoch."""self.save_dir = save_dirself.save_step = save_stepself.save_best_only = save_best_onlyself.best_metrics = -1# mkdirif not os.path.exists(self.save_dir):os.mkdir(self.save_dir)def __call__(self, step, state_dict, metric=None):if step % self.save_step > 0:returnif self.save_best_only:assert metric is not Noneif metric >= self.best_metrics:# save checkpointstorch.save(state_dict, os.path.join(self.save_dir, "best.ckpt"))# update best metricsself.best_metrics = metricelse:torch.save(state_dict, os.path.join(self.save_dir, f"{step}.ckpt"))#%%
class EarlyStopCallback:def __init__(self, patience=5, min_delta=0.01):"""Args:patience (int, optional): Number of epochs with no improvement after which training will be stopped.. Defaults to 5.min_delta (float, optional): Minimum change in the monitored quantity to qualify as an improvement, i.e. an absolute change of less than min_delta, will count as no improvement. Defaults to 0.01."""self.patience = patienceself.min_delta = min_deltaself.best_metric = -1self.counter = 0def __call__(self, metric):if metric >= self.best_metric + self.min_delta:# update best metricself.best_metric = metric# reset counter self.counter = 0else: self.counter += 1@propertydef early_stop(self):return self.counter >= self.patience#%%
# 训练
def training(model, train_loader, val_loader, epoch, loss_fct, optimizer, tensorboard_callback=None,save_ckpt_callback=None,early_stop_callback=None,eval_step=500,):record_dict = {"train": [],"val": []}global_step = 0model.train()with tqdm(total=epoch * len(train_loader)) as pbar:for epoch_id in range(epoch):# trainingfor datas, labels in train_loader:datas = datas.to(device)labels = labels.to(device)# 梯度清空optimizer.zero_grad()# 模型前向计算logits = model(datas)# 计算损失loss = loss_fct(logits, labels)# 梯度回传loss.backward()# 调整优化器,包括学习率的变动等optimizer.step()preds = logits.argmax(axis=-1)acc = accuracy_score(labels.cpu().numpy(), preds.cpu().numpy())    loss = loss.cpu().item()# recordrecord_dict["train"].append({"loss": loss, "acc": acc, "step": global_step})# evaluatingif global_step % eval_step == 0:model.eval()val_loss, val_acc = evaluating(model, val_loader, loss_fct)record_dict["val"].append({"loss": val_loss, "acc": val_acc, "step": global_step})model.train()# 1. 使用 tensorboard 可视化if tensorboard_callback is not None:tensorboard_callback(global_step, loss=loss, val_loss=val_loss,acc=acc, val_acc=val_acc,lr=optimizer.param_groups[0]["lr"],)# 2. 保存模型权重 save model checkpointif save_ckpt_callback is not None:save_ckpt_callback(global_step, model.state_dict(), metric=val_acc)# 3. 早停 Early Stopif early_stop_callback is not None:early_stop_callback(val_acc)if early_stop_callback.early_stop:print(f"Early stop at epoch {epoch_id} / global_step {global_step}")return record_dict# udate stepglobal_step += 1pbar.update(1)pbar.set_postfix({"epoch": epoch_id})return record_dictepoch = 100model = NeuralNetwork(layers_num=10)#%%
# 1. 定义损失函数 采用交叉熵损失
loss_fct = nn.CrossEntropyLoss()
# 2. 定义优化器 采用SGD
# Optimizers specified in the torch.optim package
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)# 1. tensorboard 可视化
tensorboard_callback = TensorBoardCallback("runs")
tensorboard_callback.draw_model(model, [1, 28, 28])
# 2. save best
save_ckpt_callback = SaveCheckpointsCallback("checkpoints", save_best_only=True)
# 3. early stop
early_stop_callback = EarlyStopCallback(patience=10, min_delta=0.001)model = model.to(device)
#%%
record = training(model,train_loader,val_loader,epoch,loss_fct,optimizer,tensorboard_callback=None,save_ckpt_callback=save_ckpt_callback,early_stop_callback=early_stop_callback,eval_step=len(train_loader))

画图:

def plot_learning_curves(record_dict, sample_step=500):# build DataFrametrain_df = pd.DataFrame(record_dict["train"]).set_index("step").iloc[::sample_step]val_df = pd.DataFrame(record_dict["val"]).set_index("step")# plotfig_num = len(train_df.columns)fig, axs = plt.subplots(1, fig_num, figsize=(6 * fig_num, 5))for idx, item in enumerate(train_df.columns):    axs[idx].plot(train_df.index, train_df[item], label=f"train_{item}")axs[idx].plot(val_df.index, val_df[item], label=f"val_{item}")axs[idx].grid()axs[idx].legend()axs[idx].set_xlabel("step")plt.show()plot_learning_curves(record, sample_step=10000)  #横坐标是 steps

 

http://www.lryc.cn/news/547471.html

相关文章:

  • 课程3. 分批训练与数据规范、标准化
  • 《机器学习数学基础》补充资料:过渡矩阵和坐标变换推导
  • linux指令学习--sudo apt-get install vim
  • 类和对象—多态—案例2—制作饮品
  • 嵌入式产品级-超小尺寸游戏机(从0到1 硬件-软件-外壳)
  • 计算机毕业设计Python+Django+Vue3微博数据舆情分析平台 微博用户画像系统 微博舆情可视化(源码+ 文档+PPT+讲解)
  • 前端开发10大框架深度解析
  • Mybatis 的关联映射(一对一,一对多,多对多)
  • 深度解码!清华大学第六弹《AIGC发展研究3.0版》
  • /dev/console文件详解
  • ProfibusDP主站转ModbusTCP网关如何进行数据互换
  • springboot3 WebClient
  • 牛客周赛 Round 83
  • 硬通货用Deekseek做一个Vue.js组件开发的教程
  • Windows权限维持之利用安全描述符隐藏服务后门进行权限维持(八)
  • Ubuntu20.04双系统安装及软件安装(七):Anaconda3
  • 【极光 Orbit•STC8A-8H】02. STC8 单片机工程模板创建
  • Spring Boot WebFlux 中 WebSocket 生命周期解析
  • PostgreSQL中的事务隔离
  • 基于Rye的Django项目通过Pyinstaller用Github工作流简单打包
  • ubuntu 20.04 C++ 源码编译 cuda版本 opencv4.5.0
  • 【VUE】第一期——初使用、基本语法
  • 计算光学成像与光学计算概论
  • 开启科创服务新篇章:八月瓜科技CRM数字化管理系统成功上线
  • AI提示词(Prompt)的理解和学习指南
  • 记录一些面试遇到的问题
  • OpenHarmony4.0_Linux环境搭建
  • DeepSeek开源Day5:3FSsmallpond技术详解
  • Java集合面试篇
  • plt和cv2有不同的图像表示方式和颜色通道顺序