工具包导入+数据读取
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import torch.nn as nn
import torch
import matplotlib.pyplot as plt
df = pd.read_csv('/opt/cyc/week_task/xiaoliangyuce/data_qingxi/0804/data_0803_300_new.csv')
df1=df[df['uuid']=='B0F59QC63ZUS']
df2=df1[['sale_list_time','sale_list_rank']]
数据集划分+数据集归一化+滑动窗口构造
#划分数据集
train_data,test_data=df2['sale_list_rank'][:-20],df2['sale_list_rank'][-20:]
#训练集归一化
scaler=MinMaxScaler()
train_data_scale=scaler.fit_transform(train_data.values.reshape(-1,1))
test_data_scale=scaler.fit_transform(test_data.values.reshape(-1,1))#构造滑动窗口
def slid_window_data(data,window_size):X,Y=[],[]for i in range(len(data)-window_size):X.append(data[i:i+window_size])Y.append(data[i+window_size:i+window_size+1])return np.array(X),np.array(Y)
#lstm需要的形状:(样本数,时序长度,特征数)"""[1,2,3,4,5]:滑动窗口为3[1,2,3][2,3,4][4][5]最后得到两条数据,形状[2,3,1]"""
X_train,Y_train=slid_window_data(data=train_data_scale,window_size=20)
lstm模型
class LSTM_MODEL(nn.Module):def __init__(self, input_size=1, hidden_size=100,num_layers=1):super(LSTM_MODEL,self).__init__()self.hidden_size=hidden_sizeself.lstm=nn.LSTM(input_size,hidden_size,num_layers, batch_first=True)self.fc=nn.Linear(hidden_size,1)def forward(self,x):out,_=self.lstm(x)batch_size,seq_len,hidden_size=out.shape#seq_len:序列长度,在NLP中就是句子长度,一般都会用pad_sequence补齐长度#batch:每次喂给网络的数据条数,在NLP中就是一次喂给网络多少个句子#input_size:特征维度,和前面定义网络结构的input_size一致。x=self.fc(out)x = x[:,-1,:]return x
模型训练
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
Y_train = torch.tensor(Y_train, dtype=torch.float32).to(device)
model=LSTM_MODEL()
model = model.to(device)
criterion=torch.nn.MSELoss()
optimizer=torch.optim.Adam(model.parameters(),lr=0.01)
epoch=100
for epoch in range(epoch):model.train()output=model(X_train)loss=criterion(output,Y_train)loss.backward()optimizer.step()print(f"Epoch [{epoch+1}],LOSS:{loss.item():.4f}")
预测未来值
#制作未来预测数据输入
fur_len=20
train_data_normalized = torch.FloatTensor(train_data_scale).view(-1)
test_input=train_data_normalized[-fur_len:].tolist()
model.eval()
with torch.no_grad():model.hidden_cell = (torch.zeros(1, 1, model.hidden_size).to(device),torch.zeros(1, 1, model.hidden_size).to(device))for i in range(fur_len):#print(test_input[-fur_len:])seq = torch.FloatTensor(test_input[-fur_len:])seq = seq.to(device).unsqueeze(0).unsqueeze(2) # [1, time_step, 1]test_input.append(model(seq).item())
test_input[fur_len:]
可视化对比
actual_predictions = scaler.inverse_transform(np.array(test_input[fur_len:] ).reshape(-1, 1))
plt.plot(list(range(len(test_data))),test_data,'ro-' )
plt.plot(list(range(len(actual_predictions))),actual_predictions,'bo-' )
plt.legend(["true","pred"])
plt.show()