当前位置: 首页 > news >正文

Transformer架构每层详解【代码实现】

Transformer背景就不过多介绍了,直接开始讲解每层架构

一.总体架构

 二.分层讲解

 1.Embedding层

首先先明白这个inputs是什么?

比如整个句子库只有两句(实际肯定成千上万条)

分别为:"i love my family","my family is very warm"

这里经过文本预处理后,我们可以获得一个词典,每个词不重复

比如:

i:0    love:1   my:2   family:3   is:4  very:5  warm:6 

此时整个词库大小为7,最长句子长度为5,也就是第二句话的长度,当不满足最大长度的句子,我们会使用pad来填充,比如在这里例子中我们会使用pad=7,此时词库大小变为8了

代码:

这里我们利用nn的Embedding层,重新封装一个Embeddings类,主要是为了使用math.sqrt对词嵌入后的向量进行缩放,为了和后面的位置编码相加时候,相差不要太大,不然会弱化位置编码,位置编码我们后面再说

class Embeddings(nn.Module):def __init__(self,vocab,Emb_dim):super(Embeddings,self).__init__()self.Emb = nn.Embedding(vocab,Emb_dim)self.Emb_dim = Emb_dimdef forward(self,x):return self.Emb(x)*math.sqrt(self.Emb_dim ) #和位置编码相加,为了不要太大过位置编码,进行缩放

测试:

#jupyter notebookEmb_dim = 128  #词嵌入后的维度
vocab = 8  #词库
pad=7
# "i love my family","my family is very warm"
#  i:0    love:1   my:2   family:3   is:4  very:5  warm:6 #第一句话只有4个字,我们使用pad填充
x = torch.tensor([[1, 2, 3, 4,pad], [2, 3, 4, 5,6]])   #输入
emb = Embeddings(vocab, Emb_dim) #实例化这个层
emb_result = emb(x) 
emb_result,emb_result.shape

结果:

总结:

其实,我们输入的shape是(2,5),输出的shape是(2,5,128)你可以理解为多了一个维度,这个维度用来表示每个字在第三维度空间中的向量值。2表示两个句子,5表示每个句子5个字,128表示一个字用128维向量表示

2.位置编码层 

在transformer架构中自注意力机制注重上下文,对整个输入序列同时并行处理的。

  • 不像 RNN 是一个词一个词按时间步处理(天然有序)

  • Transformer 把所有词的向量同时喂给注意力层,本质上是无序的

每个字顺序位置是不敏感的,所以我们通常使用一个位置编码与词嵌入后的向量进行相加。使向量中增加位置信息,使用sin,cos处理,值域是在[-1,1],所以Embedding我们要封装一个层,用于将词嵌入后范围尽量相近,避免相加后位置信息损失掉,比如10000+1,这个1为位置信息的话,基本可以忽略。

代码:

class PositionalEncoding(nn.Module):  #一个字对应一行向量def __init__(self,Emb_dim,dropout_rate,max_len=5000):super(PositionalEncoding,self).__init__()self.dropout = nn.Dropout(p=dropout_rate)pe = torch.zeros(max_len,Emb_dim) #初始化一个位置编码矩阵position = torch.arange(0,max_len).unsqueeze(1) #生成一个shape=(max_len,1)的矩阵#论文中的计算方法div_term = torch.exp(torch.arange(0,Emb_dim,2)*-(math.log(10000.0)/Emb_dim))pe[:,0::2]=torch.sin(position*div_term) #对偶数列用sin填充pe[:,1::2]=torch.cos(position*div_term) #对奇数列用cos填充pe = pe.unsqueeze(0)  #shape = (1,max_len,Emb_dim) 为了个词嵌入后的向量相加,维度要相同self.register_buffer('pe',pe)  #这个位置矩阵是固定的,注册一个def forward(self,x):x = x+self.pe[:,:x.size(1)]  #x.shape=(bs,seq_len,Emb_dim) 只需要加到对应列即可return self.dropout(x)

测试代码:


#jupyter notebookpe = PositionalEncoding(128, 0.1, 8)  
pe_result = pe(emb_result)  #使用Embedding层输出作为输入
pe_result,pe_result.shape

 总结:

        加入位置编码只是相加的过程,不会改变形状。

3.多头注意力层

1.注意力计算规则函数

#注意力计算规则
def attention(q,k,v,mask=None,dropout=None):dim = q.size(-1)  #最后一个维度,通常就是词嵌入维度数scores=torch.matmul(q,k.transpose(-2,-1))/math.sqrt(dim)if mask is not None:  #编码器没有mask,解码器有scores = scores.masked_fill(mask==0,-1e9)p_attn = F.softmax(scores,dim=-1)  #归一化if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn,v),p_attn  #返回最后的向量结果和归一化后的score

 在 Transformer 编码器中,注意力机制的核心思想是:每个词通过与其他所有词建立关系,动态地汇聚信息,从而生成一个富含上下文的新表示向量

以输入句子 "I love my family" 为例,假设每个词通过词嵌入后被表示为一个 128 维的向量。我们希望计算 "my" 这个词在上下文中的表示,其本质是让 "my" 去“关注”与自己和其他词的关联程度。

我们构造一个注意力得分矩阵(如下表),其中每个元素表示词之间的相似度,通过对应的 Query(Q)和 Key(K)向量点积计算:

这里因为第五个词是pad填充的,无语义,所以直接置为-1e9

为什么不使用0?因为在使用softmax归一化时,e的0次方=1,而e的-1e9≈0

k₁k₂k₃k₄k₅
q₁q₁·k₁q₁·k₂q₁·k₃q₁·k₄-1e9
q₂q₂·k₁q₂·k₂q₂·k₃q₂·k₄-1e9
q₃q₃·k₁q₃·k₂q₃·k₃q₃·k₄-1e9
q₄q₄·k₁q₄·k₂q₄·k₃q₄·k₄-1e9
q₅q₅·k₁q₅·k₂q₅·k₃q₅·k₄-1e9

现在,我们专注于第 3 个词 "my",即 q₃。为了生成其新的上下文表示,模型将使用 q₃ 去与每个 k₁~k₅ 进行点积计算,得到五个注意力得分,反映了 "my" 与每个词的相关性程度。

假设经过 softmax 归一化后,得到的注意力权重为:

[0.0, 0.1, 0.6, 0.2, 0.1]
然后,这些权重会与对应的 Value 向量(v₁v₅)进行加权求和,得到新的上下文向量:

 Attention("my") = 0.0·v₁ + 0.1·v₂ + 0.6·v₃ + 0.2·v₄ + 0.1·v₅

 “注意力机制的本质,是让每个词根据其在上下文中的重要程度,从所有词的 Value 中提取信息,形成一个动态的、语义增强的新表示。”

2.mask函数

编码器端:只需要mask掉pad位置

def create_padding_mask(x, pad_token_id):# x: (batch_size, seq_len)mask = (x != pad_token_id).unsqueeze(1).unsqueeze(2).int()  # → (batch, 1, 1, seq_len)return mask

 

 attn表:

i love my  family

k₁(i)k₂(love)k₃(my)k₄(family)k₅(pad)
q₁(i)q₁·k₁q₁·k₂q₁·k₃q₁·k₄-1e9
q₂(love)q₂·k₁q₂·k₂q₂·k₃q₂·k₄-1e9
q₃(my)q₃·k₁q₃·k₂q₃·k₃q₃·k₄-1e9
q₄(family)q₄·k₁q₄·k₂q₄·k₃q₄·k₄-1e9
q₅(pad)q₅·k₁q₅·k₂q₅·k₃q₅·k₄-1e9

 my family is very warm

k₁(my)k₂(family)k₃(is)k₄(very)k₅(warm)
q₁(my)q₁·k₁q₁·k₂q₁·k₃q₁·k₄q₁·k₅
q₂(family)q₂·k₁q₂·k₂q₂·k₃q₂·k₄q₂·k₅
q₃(is)q₃·k₁q₃·k₂q₃·k₃q₃·k₄q₃·k₅
q₄(very)q₄·k₁q₄·k₂q₄·k₃q₄·k₄q₄·k₅
q₅(warm)q₅·k₁q₅·k₂q₅·k₃q₅·k₄q₅·k₅

用于解码器端

def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.uint8)return (1 - subsequent_mask)  # bool mask

 

 假如在解码器端:

有一个计算表

k₁k₂k₃k₄k₅
q₁q₁·k₁-1e9-1e9-1e9-1e9
q₂q₂·k₁q₂·k₂-1e9-1e9-1e9
q₃q₃·k₁q₃·k₂q₃·k₃-1e9-1e9
q₄q₄·k₁q₄·k₂q₄·k₃q₄·k₄-1e9
q₅q₅·k₁q₅·k₂q₅·k₃q₅·k₄q₅·k₅

 测试代码:

#jupyter notebook#把加入了位置编码层的输出作为输入
attn, p_attn = attention(pe_result, pe_result, pe_result, mask=mask)
attn,p_attn

 3.实现多头注意力层类

 实则就是使用多个头去计算不同部分,最后拼接起来

class MultiHeadAttention(nn.Module):def __init__(self,head_num,embed_dim,dropout_rate=0.1):super(MultiHeadAttention,self).__init__()assert embed_dim % head_num ==0self.d_k = embed_dim //head  #每个头的维度self.head_num = head_numself.W_q = nn.Linear(embed_dim, embed_dim) #4个线形层,对应了四个参数矩阵,因为输入的x*W_Q矩阵,变为Q,其它同理self.W_k = nn.Linear(embed_dim, embed_dim)self.W_v = nn.Linear(embed_dim, embed_dim)self.W_o = nn.Linear(embed_dim, embed_dim)self.attn = Noneself.dropout = nn.Dropout(p=dropout_rate)def forward(self,q,k,v,mask=None):if mask is not None:mask = mask.unsqueeze(0)  #因为下面我们Q,K,V矩阵维度变化了,在attention中,为了匹配维度batch_size = q.size(0)Q = self.W_q(q)  #得到输入矩阵对应的Q,K,V矩阵,也就是每个字的q拼起来的矩阵 (batch, seq_len, embed_dim)K = self.W_k(k) #同QV = self.W_v(v) #view:在原来数据存储的顺序上,变为4维,transpose:会改变原来第二维和第三维的数据存储顺序Q = Q.view(batch_size, -1, self.head_num, self.d_k).transpose(1,2) #(batch_size, head, seq_len, d_k),为了多个头并行计算K = K.view(batch_size, -1, self.head_num, self.d_k).transpose(1,2)V = V.view(batch_size, -1, self.head_num, self.d_k).transpose(1,2)x,self.attn = attention(Q,K,V,mask=mask,dropout=self.dropout) #这里的Q,K,V其实就是多个q,k,v进行并行计算x = x.transpose(1,2).contiguous().view(batch_size,-1,self.head_num * self.d_k) #把多个头拼接起来result = self.W_o(x) #再经过一个线性变换return result

 测试代码:

mha = MultiHeadAttention(8, 128, 0.2)
#自注意力机制,输入的q,k,v相等
mha_result = mha(pe_result, pe_result, pe_result, mask)
mha_result,mha_result.shape

4.规范化层

 主要起一个防止过拟合的作用

代码中在分母加入一个特别小的值,防止出现分母为0的情况,并且增加了2个可训练的参数

a * X' +b

代码

#规范化层
class LayerNorm(nn.Module):def __init__(self,embedding_dim,eps=1e-6):super(LayerNorm,self).__init__()#a,b是模型可以更新的参数self.a = nn.Parameter(torch.ones(embedding_dim)) #缩放系数self.b = nn.Parameter(torch.zeros(embedding_dim)) #偏置值self.eps = epsdef forward(self,x):mean = x.mean(-1,keepdim=True)std = x.std(-1,keepdim=True)return self.a *(x-mean)/(std+self.eps)+self.b  #分母会加一个特别小的数防止分母为0

 测试代码

norm = LayerNorm(128)
norm_result = norm(mha_result)  #多头注意力层的输出作为输入
norm_result,norm_result.shape

5.前馈全连接层

这一层会经过两个线性变换,以及dropout变换

 代码:

#前馈全连接层ADD
class PositionwiseFeedForward(nn.Module):def __init__(self,embedding_dim,linaer_dim,dropout_rate=0.1):super(PositionwiseFeedForward,self).__init__()self.Linear1 = nn.Linear(embedding_dim,linaer_dim)self.Linear2 =nn.Linear(linaer_dim,embedding_dim)self.dropout=nn.Dropout(p=dropout_rate)def forward(self,x):x = F.relu(self.Linear1(x))x = self.dropout(x)x = self.Linear2(x)return x

测试代码

ff = PositionwiseFeedForward(128, 64)
ff_norm_result = norm(ff(norm_result)) #框架中还使用了一层norm层
ff_norm_result,ff_result.shape

 6.把前面子层封装成一个编码器类

1.实现子层连接的类

因为我们子层模块有5层,我们要实现一个把子层模块作为参数,从而实现连接的类

由于我们发现在多头注意力层和前馈全连接层后都接了一个norm层,所以我们直接将其封装在实现连接的类中。但是,这里是后接入的norm层,但是在实际中发现,先使用norm再接入子层,效果更好(玄学),所以我们代码中,先使用norm层进行变换

#实现子层连接的类
class SublayerConnection(nn.Module):def __init__(self,embedding_dim,dropout_rate=0.1):super(SublayerConnection,self).__init__()self.norm = LayerNorm(embedding_dim)self.dropout = nn.Dropout(p=dropout_rate)def forward(self,x,sublayer):x = self.norm(x)x = sublayer(x)  #sublayer其实就是另外一个子层的forward函数调用result = x+self.dropout(x) #残差连接return result

2.编码器层类的实现

代码

class EncoderLayer(nn.Module):def __init__(self,embedding_dim,self_attn,feed_forward,dropout_rate):super(EncoderLayer,self).__init__()self.embedding_dim = embedding_dimself.self_attn = self_attn  #注意力层self.feed_forward = feed_forward #前馈全连接层self.sublayer1 = SublayerConnection(embedding_dim,dropout_rate) #用于连接第一个子层self.sublayer2 = SublayerConnection(embedding_dim,dropout_rate) #用于连接第二个子层def forward(self,x,mask):#连接注意力层和norm层x = self.sublayer1(x,lambda x:self.self_attn(x,x,x,mask))#连接前馈全连接层和norm层x = self.sublayer2(x,self.feed_forward)return x

测试代码:

Emb_dim = 128
vocab = 8
pad=7
dropout_rate=0.2
x = torch.tensor([[1, 2, 3, 4,pad], [2, 3, 4, 5,6]])mask = create_padding_mask(5)emb = Embeddings(vocab, Emb_dim)
emb_result = emb(x) #emb层输出pe = PositionalEncoding(Emb_dim,dropout_rate)
pe_result = pe(emb_result) #加入位置编码输出mha = MultiHeadAttention(8,Emb_dim,dropout_rate)
ff = PositionwiseFeedForward(Emb_dim,64)
encoder =  EncoderLayer(Emb_dim,mha,ff,dropout_rate)#编码器输出
out = encoder(pe_result,mask)
print(out)

 3.将N个编码器封装成编码器

自定义一个拷贝函数
def clones(module, N):# module: 代表要克隆的目标网络层# N: 将module克隆几个return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
class Encoder(nn.Module):def __init__(self,layer,N):super(Encoder,self).__init__()self.layers = clones(layer,N)self.norm = LayerNorm(layer.embedding_dim)def forward(self, x, mask=None):for layer in self.layers:x = layer(x, mask)return self.norm(x)

测试代码

Emb_dim = 128
vocab = 8
pad=7
dropout_rate=0.2x = torch.tensor([[1, 2, 3, 4,pad], [2, 3, 4, 5,6]])#编码器端mask,需要
mask = create_padding_mask(x, pad_token_id=pad)  # shape: (2, 1, 1, 5)emb = Embeddings(vocab, Emb_dim)
emb_result = emb(x)pe = PositionalEncoding(Emb_dim,dropout_rate)
pe_result = pe(emb_result)mha = MultiHeadAttention(8,Emb_dim,dropout_rate)
ff = PositionwiseFeedForward(Emb_dim,64)
norm = LayerNorm(Emb_dim)layper = EncoderLayer(Emb_dim,mha,ff,0.2)Encoder= Encoder(layper,5)
encoder_result = Encoder(pe_result,mask)
encoder_result

 

最终我们实现了一个编码器,也就是把输入的句子,每个词都根据上下文得到了一个128维向量。

 根据打印的结果我们可以看到,每一行就是每个词对应的向量,其中my和family虽然词一样,但是由于其位置和上下文不同,我们可以得到不同的向量

 7.封装解码器层

 类似编码器层,这里需要三个连接。注意,这里跨注意力层的,K,V是编码器端的输出,Q是解码器端的

代码:

class DecoderLayer(nn.Module):def __init__(self, d_model, self_attn, src_attn, feed_forward, dropout_rate):super(DecoderLayer, self).__init__()self.d_model = d_modelself.self_attn = self_attn  # 注意力层self.src_attn = src_attn  # 跨注意力self.feed_forward = feed_forward  # 前馈全连接层self.sublayers = clones(SublayerConnection(d_model, dropout_rate), 3)  # 用于连接第一个子层def forward(self, x, memory, source_mask, target_mask):# 来自编码器m = memoryx = self.sublayers[0](x, lambda x: self.self_attn(x, x, x, target_mask))x = self.sublayers[1](x, lambda x: self.src_attn(x, m, m, source_mask))x = self.sublayers[2](x, self.feed_forward)return x

 8.用N个解码器层封装解码器

代码:

class Decoder(nn.Module):def __init__(self,layer,N):super(Decoder,self).__init__()self.layers = clones(layer,N)self.norm = LayerNorm(layer.d_model)def forward(self, x,memory,source_mask,target_mask):for layer in self.layers:x = layer(x, memory,source_mask,target_mask)return self.norm(x)

 9.封装一个输出层

 代码:

class Generator(nn.Module):def __init__(self,d_model,vocab_size):super(Generator,self).__init__()self.Linear = nn.Linear(d_model,vocab_size)def forward(self,x):result = F.log_softmax(self.Linear(x),dim=-1)return result

10. 构建编码器-解码器结构类

 其实就是一整块transformer的架构

代码:

# 构建编码器-解码器结构类
class EncoderDecoder(nn.Module):def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):super(EncoderDecoder, self).__init__()self.encoder = encoderself.decoder = decoderself.src_embed = src_embedself.tgt_embed = tgt_embedself.generator = generatordef forward(self, source, target, source_mask, target_mask):encode_result = self.encode(source, source_mask)decode_result = self.decode(encode_result, source_mask, target, target_mask)return self.generator(decode_result)def encode(self, source, source_mask):embed_result = self.src_embed(source)encoder_result = self.encoder(embed_result, source_mask)return encoder_resultdef decode(self, memory, source_mask, target, target_mask):# memory: 代表经历编码器编码后的输出张量embed_result = self.tgt_embed(target)decoder_result = self.decoder(embed_result, memory, source_mask, target_mask)return decoder_result

11.组合成一个transformer模型

 代码:

def make_model(source_vocab, target_vocab, N=6, d_model=512, d_ff=2048, head=8, dropout_rate=0.1):c = copy.deepcopy  #要用到深拷贝某些层,避免参数共用了attn = MultiHeadAttention(head, d_model) #注意力层ff = PositionwiseFeedForward(d_model, d_ff, dropout_rate) #前馈全连接层position = PositionalEncoding(d_model, dropout_rate) #位置编码层encoderLayer = EncoderLayer(d_model, c(attn), c(ff), dropout_rate)decoderLayer = DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout_rate)encoder = Encoder(encoderLayer, N)decoder = Decoder(decoderLayer, N)#表示,先经过Embeddings层,再经过位置编码层src_embed = nn.Sequential(Embeddings(source_vocab, d_model), c(position))tgt_embed = nn.Sequential(Embeddings(target_vocab, d_model), c(position))#输出层gen = Generator(d_model, target_vocab)#组合成一个模型model = EncoderDecoder(encoder, decoder, src_embed, tgt_embed, gen)#把这个模型的参数(只要维度大于1的)初始化for p in model.parameters():if p.dim() > 1:nn.init.xavier_uniform_(p)return model

http://www.lryc.cn/news/573516.html

相关文章:

  • LangGraph--基础学习(工具调用)
  • 2025zbrush雕刻笔记
  • NW849NX721美光固态闪存NX745NX751
  • 微处理器原理与应用篇---计算机系统的结构、组织与实现
  • 给交叉工具链增加libelf.so
  • 操作系统内核态和用户态--2-系统调用是什么?
  • 嵌入式开发之嵌入式系统架构如何搭建?
  • 【软考高级系统架构论文】论面向服务架构设计及其应用
  • modelscope设置默认模型路径
  • python的校园兼职系统
  • Taro 跨端开发:从调试到发布的完整指南
  • 基于正点原子阿波罗F429开发板的LWIP应用(7)——MQTT
  • 华为OD机试-云短信平台优惠活动-完全背包(JAVA 2024E卷)
  • TodoList 案例(Vue3): 使用Composition API
  • 嵌入式开发之嵌入式系统硬件架构设计时,如何选择合适的微处理器/微控制器?
  • 腾讯云IM即时通讯:开启实时通信新时代
  • 一文详解归并分治算法
  • Python:.py文件如何变成双击可执行的windows程序?(版本1)
  • 深入Java面试:从Spring Boot到微服务
  • Django数据库迁移
  • P1220 关路灯
  • Spring Boot + MyBatis + Vue:全栈开发的深度剖析与实践指南
  • IEEE5节点系统潮流仿真模型(simulink+matlab全功能模型)
  • maxcomputer 和 hologres中的EXTERNAL TABLE 和 FOREIGN TABLE
  • Qt/C++应用:防御性编程完全指南
  • C 语言结构体:从基础到内存对齐深度解析
  • 数据结构——函数填空题
  • Rust调用 DeepSeek API
  • Redis 的穿透、雪崩、击穿
  • SuGAR代码精简解读