Transformer架构每层详解【代码实现】
Transformer背景就不过多介绍了,直接开始讲解每层架构
一.总体架构
二.分层讲解
1.Embedding层
首先先明白这个inputs是什么?
比如整个句子库只有两句(实际肯定成千上万条)
分别为:"i love my family","my family is very warm"
这里经过文本预处理后,我们可以获得一个词典,每个词不重复
比如:
i:0 love:1 my:2 family:3 is:4 very:5 warm:6
此时整个词库大小为7,最长句子长度为5,也就是第二句话的长度,当不满足最大长度的句子,我们会使用pad来填充,比如在这里例子中我们会使用pad=7,此时词库大小变为8了
代码:
这里我们利用nn的Embedding层,重新封装一个Embeddings类,主要是为了使用math.sqrt对词嵌入后的向量进行缩放,为了和后面的位置编码相加时候,相差不要太大,不然会弱化位置编码,位置编码我们后面再说
class Embeddings(nn.Module):def __init__(self,vocab,Emb_dim):super(Embeddings,self).__init__()self.Emb = nn.Embedding(vocab,Emb_dim)self.Emb_dim = Emb_dimdef forward(self,x):return self.Emb(x)*math.sqrt(self.Emb_dim ) #和位置编码相加,为了不要太大过位置编码,进行缩放
测试:
#jupyter notebookEmb_dim = 128 #词嵌入后的维度
vocab = 8 #词库
pad=7
# "i love my family","my family is very warm"
# i:0 love:1 my:2 family:3 is:4 very:5 warm:6 #第一句话只有4个字,我们使用pad填充
x = torch.tensor([[1, 2, 3, 4,pad], [2, 3, 4, 5,6]]) #输入
emb = Embeddings(vocab, Emb_dim) #实例化这个层
emb_result = emb(x)
emb_result,emb_result.shape
结果:
总结:
其实,我们输入的shape是(2,5),输出的shape是(2,5,128)你可以理解为多了一个维度,这个维度用来表示每个字在第三维度空间中的向量值。2表示两个句子,5表示每个句子5个字,128表示一个字用128维向量表示
2.位置编码层
在transformer架构中自注意力机制注重上下文,对整个输入序列同时并行处理的。
-
不像 RNN 是一个词一个词按时间步处理(天然有序)
-
Transformer 把所有词的向量同时喂给注意力层,本质上是无序的
每个字顺序位置是不敏感的,所以我们通常使用一个位置编码与词嵌入后的向量进行相加。使向量中增加位置信息,使用sin,cos处理,值域是在[-1,1],所以Embedding我们要封装一个层,用于将词嵌入后范围尽量相近,避免相加后位置信息损失掉,比如10000+1,这个1为位置信息的话,基本可以忽略。
代码:
class PositionalEncoding(nn.Module): #一个字对应一行向量def __init__(self,Emb_dim,dropout_rate,max_len=5000):super(PositionalEncoding,self).__init__()self.dropout = nn.Dropout(p=dropout_rate)pe = torch.zeros(max_len,Emb_dim) #初始化一个位置编码矩阵position = torch.arange(0,max_len).unsqueeze(1) #生成一个shape=(max_len,1)的矩阵#论文中的计算方法div_term = torch.exp(torch.arange(0,Emb_dim,2)*-(math.log(10000.0)/Emb_dim))pe[:,0::2]=torch.sin(position*div_term) #对偶数列用sin填充pe[:,1::2]=torch.cos(position*div_term) #对奇数列用cos填充pe = pe.unsqueeze(0) #shape = (1,max_len,Emb_dim) 为了个词嵌入后的向量相加,维度要相同self.register_buffer('pe',pe) #这个位置矩阵是固定的,注册一个def forward(self,x):x = x+self.pe[:,:x.size(1)] #x.shape=(bs,seq_len,Emb_dim) 只需要加到对应列即可return self.dropout(x)
测试代码:
#jupyter notebookpe = PositionalEncoding(128, 0.1, 8)
pe_result = pe(emb_result) #使用Embedding层输出作为输入
pe_result,pe_result.shape
总结:
加入位置编码只是相加的过程,不会改变形状。
3.多头注意力层
1.注意力计算规则函数
#注意力计算规则
def attention(q,k,v,mask=None,dropout=None):dim = q.size(-1) #最后一个维度,通常就是词嵌入维度数scores=torch.matmul(q,k.transpose(-2,-1))/math.sqrt(dim)if mask is not None: #编码器没有mask,解码器有scores = scores.masked_fill(mask==0,-1e9)p_attn = F.softmax(scores,dim=-1) #归一化if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn,v),p_attn #返回最后的向量结果和归一化后的score
在 Transformer 编码器中,注意力机制的核心思想是:每个词通过与其他所有词建立关系,动态地汇聚信息,从而生成一个富含上下文的新表示向量。
以输入句子 "I love my family" 为例,假设每个词通过词嵌入后被表示为一个 128 维的向量。我们希望计算 "my" 这个词在上下文中的表示,其本质是让 "my" 去“关注”与自己和其他词的关联程度。
我们构造一个注意力得分矩阵(如下表),其中每个元素表示词之间的相似度,通过对应的 Query(Q)和 Key(K)向量点积计算:
这里因为第五个词是pad填充的,无语义,所以直接置为-1e9
为什么不使用0?因为在使用softmax归一化时,e的0次方=1,而e的-1e9≈0
k₁ | k₂ | k₃ | k₄ | k₅ | |
---|---|---|---|---|---|
q₁ | q₁·k₁ | q₁·k₂ | q₁·k₃ | q₁·k₄ | -1e9 |
q₂ | q₂·k₁ | q₂·k₂ | q₂·k₃ | q₂·k₄ | -1e9 |
q₃ | q₃·k₁ | q₃·k₂ | q₃·k₃ | q₃·k₄ | -1e9 |
q₄ | q₄·k₁ | q₄·k₂ | q₄·k₃ | q₄·k₄ | -1e9 |
q₅ | q₅·k₁ | q₅·k₂ | q₅·k₃ | q₅·k₄ | -1e9 |
现在,我们专注于第 3 个词 "my",即 q₃
。为了生成其新的上下文表示,模型将使用 q₃
去与每个 k₁~k₅
进行点积计算,得到五个注意力得分,反映了 "my" 与每个词的相关性程度。
假设经过 softmax 归一化后,得到的注意力权重为:
[0.0, 0.1, 0.6, 0.2, 0.1]
然后,这些权重会与对应的 Value 向量(v₁
到 v₅
)进行加权求和,得到新的上下文向量:
Attention("my") = 0.0·v₁ + 0.1·v₂ + 0.6·v₃ + 0.2·v₄ + 0.1·v₅
“注意力机制的本质,是让每个词根据其在上下文中的重要程度,从所有词的 Value 中提取信息,形成一个动态的、语义增强的新表示。”
2.mask函数
编码器端:只需要mask掉pad位置
def create_padding_mask(x, pad_token_id):# x: (batch_size, seq_len)mask = (x != pad_token_id).unsqueeze(1).unsqueeze(2).int() # → (batch, 1, 1, seq_len)return mask
attn表:
i love my family
k₁(i) | k₂(love) | k₃(my) | k₄(family) | k₅(pad) | |
---|---|---|---|---|---|
q₁(i) | q₁·k₁ | q₁·k₂ | q₁·k₃ | q₁·k₄ | -1e9 |
q₂(love) | q₂·k₁ | q₂·k₂ | q₂·k₃ | q₂·k₄ | -1e9 |
q₃(my) | q₃·k₁ | q₃·k₂ | q₃·k₃ | q₃·k₄ | -1e9 |
q₄(family) | q₄·k₁ | q₄·k₂ | q₄·k₃ | q₄·k₄ | -1e9 |
q₅(pad) | q₅·k₁ | q₅·k₂ | q₅·k₃ | q₅·k₄ | -1e9 |
my family is very warm
k₁(my) | k₂(family) | k₃(is) | k₄(very) | k₅(warm) | |
---|---|---|---|---|---|
q₁(my) | q₁·k₁ | q₁·k₂ | q₁·k₃ | q₁·k₄ | q₁·k₅ |
q₂(family) | q₂·k₁ | q₂·k₂ | q₂·k₃ | q₂·k₄ | q₂·k₅ |
q₃(is) | q₃·k₁ | q₃·k₂ | q₃·k₃ | q₃·k₄ | q₃·k₅ |
q₄(very) | q₄·k₁ | q₄·k₂ | q₄·k₃ | q₄·k₄ | q₄·k₅ |
q₅(warm) | q₅·k₁ | q₅·k₂ | q₅·k₃ | q₅·k₄ | q₅·k₅ |
用于解码器端
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.uint8)return (1 - subsequent_mask) # bool mask
假如在解码器端:
有一个计算表
k₁ | k₂ | k₃ | k₄ | k₅ | |
---|---|---|---|---|---|
q₁ | q₁·k₁ | -1e9 | -1e9 | -1e9 | -1e9 |
q₂ | q₂·k₁ | q₂·k₂ | -1e9 | -1e9 | -1e9 |
q₃ | q₃·k₁ | q₃·k₂ | q₃·k₃ | -1e9 | -1e9 |
q₄ | q₄·k₁ | q₄·k₂ | q₄·k₃ | q₄·k₄ | -1e9 |
q₅ | q₅·k₁ | q₅·k₂ | q₅·k₃ | q₅·k₄ | q₅·k₅ |
测试代码:
#jupyter notebook#把加入了位置编码层的输出作为输入
attn, p_attn = attention(pe_result, pe_result, pe_result, mask=mask)
attn,p_attn
3.实现多头注意力层类
实则就是使用多个头去计算不同部分,最后拼接起来
class MultiHeadAttention(nn.Module):def __init__(self,head_num,embed_dim,dropout_rate=0.1):super(MultiHeadAttention,self).__init__()assert embed_dim % head_num ==0self.d_k = embed_dim //head #每个头的维度self.head_num = head_numself.W_q = nn.Linear(embed_dim, embed_dim) #4个线形层,对应了四个参数矩阵,因为输入的x*W_Q矩阵,变为Q,其它同理self.W_k = nn.Linear(embed_dim, embed_dim)self.W_v = nn.Linear(embed_dim, embed_dim)self.W_o = nn.Linear(embed_dim, embed_dim)self.attn = Noneself.dropout = nn.Dropout(p=dropout_rate)def forward(self,q,k,v,mask=None):if mask is not None:mask = mask.unsqueeze(0) #因为下面我们Q,K,V矩阵维度变化了,在attention中,为了匹配维度batch_size = q.size(0)Q = self.W_q(q) #得到输入矩阵对应的Q,K,V矩阵,也就是每个字的q拼起来的矩阵 (batch, seq_len, embed_dim)K = self.W_k(k) #同QV = self.W_v(v) #view:在原来数据存储的顺序上,变为4维,transpose:会改变原来第二维和第三维的数据存储顺序Q = Q.view(batch_size, -1, self.head_num, self.d_k).transpose(1,2) #(batch_size, head, seq_len, d_k),为了多个头并行计算K = K.view(batch_size, -1, self.head_num, self.d_k).transpose(1,2)V = V.view(batch_size, -1, self.head_num, self.d_k).transpose(1,2)x,self.attn = attention(Q,K,V,mask=mask,dropout=self.dropout) #这里的Q,K,V其实就是多个q,k,v进行并行计算x = x.transpose(1,2).contiguous().view(batch_size,-1,self.head_num * self.d_k) #把多个头拼接起来result = self.W_o(x) #再经过一个线性变换return result
测试代码:
mha = MultiHeadAttention(8, 128, 0.2)
#自注意力机制,输入的q,k,v相等
mha_result = mha(pe_result, pe_result, pe_result, mask)
mha_result,mha_result.shape
4.规范化层
主要起一个防止过拟合的作用
代码中在分母加入一个特别小的值,防止出现分母为0的情况,并且增加了2个可训练的参数
a * X' +b
代码
#规范化层
class LayerNorm(nn.Module):def __init__(self,embedding_dim,eps=1e-6):super(LayerNorm,self).__init__()#a,b是模型可以更新的参数self.a = nn.Parameter(torch.ones(embedding_dim)) #缩放系数self.b = nn.Parameter(torch.zeros(embedding_dim)) #偏置值self.eps = epsdef forward(self,x):mean = x.mean(-1,keepdim=True)std = x.std(-1,keepdim=True)return self.a *(x-mean)/(std+self.eps)+self.b #分母会加一个特别小的数防止分母为0
测试代码
norm = LayerNorm(128)
norm_result = norm(mha_result) #多头注意力层的输出作为输入
norm_result,norm_result.shape
5.前馈全连接层
这一层会经过两个线性变换,以及dropout变换
代码:
#前馈全连接层ADD
class PositionwiseFeedForward(nn.Module):def __init__(self,embedding_dim,linaer_dim,dropout_rate=0.1):super(PositionwiseFeedForward,self).__init__()self.Linear1 = nn.Linear(embedding_dim,linaer_dim)self.Linear2 =nn.Linear(linaer_dim,embedding_dim)self.dropout=nn.Dropout(p=dropout_rate)def forward(self,x):x = F.relu(self.Linear1(x))x = self.dropout(x)x = self.Linear2(x)return x
测试代码
ff = PositionwiseFeedForward(128, 64)
ff_norm_result = norm(ff(norm_result)) #框架中还使用了一层norm层
ff_norm_result,ff_result.shape
6.把前面子层封装成一个编码器类
1.实现子层连接的类
因为我们子层模块有5层,我们要实现一个把子层模块作为参数,从而实现连接的类
由于我们发现在多头注意力层和前馈全连接层后都接了一个norm层,所以我们直接将其封装在实现连接的类中。但是,这里是后接入的norm层,但是在实际中发现,先使用norm再接入子层,效果更好(玄学),所以我们代码中,先使用norm层进行变换
#实现子层连接的类
class SublayerConnection(nn.Module):def __init__(self,embedding_dim,dropout_rate=0.1):super(SublayerConnection,self).__init__()self.norm = LayerNorm(embedding_dim)self.dropout = nn.Dropout(p=dropout_rate)def forward(self,x,sublayer):x = self.norm(x)x = sublayer(x) #sublayer其实就是另外一个子层的forward函数调用result = x+self.dropout(x) #残差连接return result
2.编码器层类的实现
代码
class EncoderLayer(nn.Module):def __init__(self,embedding_dim,self_attn,feed_forward,dropout_rate):super(EncoderLayer,self).__init__()self.embedding_dim = embedding_dimself.self_attn = self_attn #注意力层self.feed_forward = feed_forward #前馈全连接层self.sublayer1 = SublayerConnection(embedding_dim,dropout_rate) #用于连接第一个子层self.sublayer2 = SublayerConnection(embedding_dim,dropout_rate) #用于连接第二个子层def forward(self,x,mask):#连接注意力层和norm层x = self.sublayer1(x,lambda x:self.self_attn(x,x,x,mask))#连接前馈全连接层和norm层x = self.sublayer2(x,self.feed_forward)return x
测试代码:
Emb_dim = 128
vocab = 8
pad=7
dropout_rate=0.2
x = torch.tensor([[1, 2, 3, 4,pad], [2, 3, 4, 5,6]])mask = create_padding_mask(5)emb = Embeddings(vocab, Emb_dim)
emb_result = emb(x) #emb层输出pe = PositionalEncoding(Emb_dim,dropout_rate)
pe_result = pe(emb_result) #加入位置编码输出mha = MultiHeadAttention(8,Emb_dim,dropout_rate)
ff = PositionwiseFeedForward(Emb_dim,64)
encoder = EncoderLayer(Emb_dim,mha,ff,dropout_rate)#编码器输出
out = encoder(pe_result,mask)
print(out)
3.将N个编码器封装成编码器
自定义一个拷贝函数
def clones(module, N):# module: 代表要克隆的目标网络层# N: 将module克隆几个return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
class Encoder(nn.Module):def __init__(self,layer,N):super(Encoder,self).__init__()self.layers = clones(layer,N)self.norm = LayerNorm(layer.embedding_dim)def forward(self, x, mask=None):for layer in self.layers:x = layer(x, mask)return self.norm(x)
测试代码
Emb_dim = 128
vocab = 8
pad=7
dropout_rate=0.2x = torch.tensor([[1, 2, 3, 4,pad], [2, 3, 4, 5,6]])#编码器端mask,需要
mask = create_padding_mask(x, pad_token_id=pad) # shape: (2, 1, 1, 5)emb = Embeddings(vocab, Emb_dim)
emb_result = emb(x)pe = PositionalEncoding(Emb_dim,dropout_rate)
pe_result = pe(emb_result)mha = MultiHeadAttention(8,Emb_dim,dropout_rate)
ff = PositionwiseFeedForward(Emb_dim,64)
norm = LayerNorm(Emb_dim)layper = EncoderLayer(Emb_dim,mha,ff,0.2)Encoder= Encoder(layper,5)
encoder_result = Encoder(pe_result,mask)
encoder_result
最终我们实现了一个编码器,也就是把输入的句子,每个词都根据上下文得到了一个128维向量。
根据打印的结果我们可以看到,每一行就是每个词对应的向量,其中my和family虽然词一样,但是由于其位置和上下文不同,我们可以得到不同的向量
7.封装解码器层
类似编码器层,这里需要三个连接。注意,这里跨注意力层的,K,V是编码器端的输出,Q是解码器端的
代码:
class DecoderLayer(nn.Module):def __init__(self, d_model, self_attn, src_attn, feed_forward, dropout_rate):super(DecoderLayer, self).__init__()self.d_model = d_modelself.self_attn = self_attn # 注意力层self.src_attn = src_attn # 跨注意力self.feed_forward = feed_forward # 前馈全连接层self.sublayers = clones(SublayerConnection(d_model, dropout_rate), 3) # 用于连接第一个子层def forward(self, x, memory, source_mask, target_mask):# 来自编码器m = memoryx = self.sublayers[0](x, lambda x: self.self_attn(x, x, x, target_mask))x = self.sublayers[1](x, lambda x: self.src_attn(x, m, m, source_mask))x = self.sublayers[2](x, self.feed_forward)return x
8.用N个解码器层封装解码器
代码:
class Decoder(nn.Module):def __init__(self,layer,N):super(Decoder,self).__init__()self.layers = clones(layer,N)self.norm = LayerNorm(layer.d_model)def forward(self, x,memory,source_mask,target_mask):for layer in self.layers:x = layer(x, memory,source_mask,target_mask)return self.norm(x)
9.封装一个输出层
代码:
class Generator(nn.Module):def __init__(self,d_model,vocab_size):super(Generator,self).__init__()self.Linear = nn.Linear(d_model,vocab_size)def forward(self,x):result = F.log_softmax(self.Linear(x),dim=-1)return result
10. 构建编码器-解码器结构类
其实就是一整块transformer的架构
代码:
# 构建编码器-解码器结构类
class EncoderDecoder(nn.Module):def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):super(EncoderDecoder, self).__init__()self.encoder = encoderself.decoder = decoderself.src_embed = src_embedself.tgt_embed = tgt_embedself.generator = generatordef forward(self, source, target, source_mask, target_mask):encode_result = self.encode(source, source_mask)decode_result = self.decode(encode_result, source_mask, target, target_mask)return self.generator(decode_result)def encode(self, source, source_mask):embed_result = self.src_embed(source)encoder_result = self.encoder(embed_result, source_mask)return encoder_resultdef decode(self, memory, source_mask, target, target_mask):# memory: 代表经历编码器编码后的输出张量embed_result = self.tgt_embed(target)decoder_result = self.decoder(embed_result, memory, source_mask, target_mask)return decoder_result
11.组合成一个transformer模型
代码:
def make_model(source_vocab, target_vocab, N=6, d_model=512, d_ff=2048, head=8, dropout_rate=0.1):c = copy.deepcopy #要用到深拷贝某些层,避免参数共用了attn = MultiHeadAttention(head, d_model) #注意力层ff = PositionwiseFeedForward(d_model, d_ff, dropout_rate) #前馈全连接层position = PositionalEncoding(d_model, dropout_rate) #位置编码层encoderLayer = EncoderLayer(d_model, c(attn), c(ff), dropout_rate)decoderLayer = DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout_rate)encoder = Encoder(encoderLayer, N)decoder = Decoder(decoderLayer, N)#表示,先经过Embeddings层,再经过位置编码层src_embed = nn.Sequential(Embeddings(source_vocab, d_model), c(position))tgt_embed = nn.Sequential(Embeddings(target_vocab, d_model), c(position))#输出层gen = Generator(d_model, target_vocab)#组合成一个模型model = EncoderDecoder(encoder, decoder, src_embed, tgt_embed, gen)#把这个模型的参数(只要维度大于1的)初始化for p in model.parameters():if p.dim() > 1:nn.init.xavier_uniform_(p)return model