pytorch千问模型源码分析
# 规范化技术,旨在替代传统的 Layer Normalization(LN)
# 核心思想是对输入张量的每个样本的每个特征进行规范化,使其均值为 0,方差为 1
class Qwen2RMSNorm(nn.Module):
def __init__(self, hidden_size, eps=1e-6): # 隐藏层的大小
super().__init__()
# 一个可学习的权重参数,初始化为全 1 张量。
self.weight = nn.Parameter(torch.ones(hidden_size))
# 用于防止除零错误的小常数。
self.variance_epsilon = eps
def forward(self, hidden_states):
# 记录输入张量的数据类型,以便最终转换回原始类型。
input_dtype = hidden_states.dtype
# 转换为 torch.float32 类型,以确保数值稳定性。
hidden_states = hidden_states.to(torch.float32)
# 计算每个样本的方差
variance = hidden_states.pow(2).mean(-1, keepdim=True)
# 计算每个样本的 RMS 值,并对每个样本进行规范化
hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
# 应用可学习的权重,其中 γγ 是一个可学习的参数,用于缩放规范化后的张量。
return self.weight * hidden_states.to(input_dtype)
# 用于生成旋转位置嵌入。这种嵌入方法在 Transformer 模型中用于捕捉序列中的位置信息,尤其适用于长序列任务。
# 通过旋转的方式将位置信息编码到嵌入向量中。具体步骤如下:
# 生成频率:通过指数函数生成一系列频率值。计算正弦和余弦:利用生成的频率计算正弦和余弦值
# ,旋转嵌入:将输入向量按一定规则旋转,以嵌入位置信息。
class Qwen2RotaryEmbedding(nn.Module):
def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None):
super().__init__()
self.dim = dim
# 最大位置嵌入的长度,默认为 2048,base:基数,默认为 10000。。
self.max_position_embeddings = max_position_embeddings
self.base = base
# inv_freq:计算频率的逆值。
# 位置列表先归一化(从绝对位置变成相对位置),之后取指数(1--接近10000),之后取倒数,位置从1--越来越小
inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2, dtype=torch.int64).float().to(device) / self.dim))
# register_buffer:将 inv_freq 注册为缓冲区,以便在模型保存和加载时保持不变。
# register_buffer 方法用于注册一个非训练的缓冲区(buffer),这意味着它不会被梯度更新。当你使用 register_buffer 注册一个缓
# 冲区时,它会被保存在模型的状态字典(state dict)中,并且在模型保存和加载时也会被序列化。
# persistent=True:缓冲区会出现在模型的状态字典中,并且会被序列化和加载。
# persistent=False:缓冲区不会出现在模型的状态字典中,但在实际保存和加载时,仍然会被序列化并加载。
self.register_buffer("inv_freq", inv_freq, persistent=False)
# Build here to make `torch.jit.trace` work.生成正弦和余弦缓存
self._set_cos_sin_cache(
seq_len=max_position_embeddings, device=self.inv_freq.device, dtype=torch.get_default_dtype()
)
def _set_cos_sin_cache(self, seq_len, device, dtype):
self.max_seq_len_cached = seq_len
# t 是一个包含位置索引的张量,形状为 (seq_len,)。
t = torch.arange(self.max_seq_len_cached, device=device, dtype=torch.int64).type_as(self.inv_freq)
# torch.outer:计算外积,得到一个形状为 (seq_len, dim/2) 的张量
freqs = torch.outer(t, self.inv_freq) # 计算频率。
# Different from paper, but it uses a different permutation in order to obtain the same calculation
# 拼接频率。emb 的形状为 (seq_len, dim)。
# 在旋转位置嵌入(RoPE)中,我们通常将嵌入向量分为两个部分,并分别应用正弦和余弦变换。具体来说:
# 对于每个位置 tt,计算频率 ff,得到一个形状为 (seq_len, dim/2) 的张量。
# 将频率张量拼接两次,得到一个形状为 (seq_len, dim) 的张量。
# 这样做的原因是,我们将嵌入向量分为两部分,每部分对应一个频率值。
emb = torch.cat((freqs, freqs), dim=-1)
# cos_cached 和 sin_cached:注册正弦和余弦缓存。
self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)
def forward(self, x, seq_len=None): # x:输入张量。
# x: [bs, num_attention_heads, seq_len, head_size]
# 如果 seq_len 大于已缓存的最大长度,则重新生成缓存。
if seq_len > self.max_seq_len_cached:
self._set_cos_sin_cache(seq_len=seq_len, device=x.device, dtype=x.dtype)
return ( # 返回正弦和余弦缓存的切片。
self.cos_cached[:seq_len].to(dtype=x.dtype),
self.sin_cached[:seq_len].to(dtype=x.dtype),
)
class Qwen2MLP(nn.Module):
def __init__(self, config):
super().__init__()
self.hidden_size = config.hidden_size # d
self.intermediate_size = config.intermediate_size # hd
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) # d-->hd
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)# d-->hd
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) # hd-->d
self.act_fn = ACT2FN[config.hidden_act]
def forward(self, hidden_state): # (h,s,d)
# 门控信号生成:gate_proj(hidden_state) 生成门控信号
# 特征调整:gate_output 与 up_output 相乘,将门控信号应用于特征表示。
# 门控机制的作用:通过门控信号动态调整哪些特征应该通过哪些特征应该被抑制。
# 激活函数的选择:如果 config.hidden_act 是 "sigmoid",那么激活函数将是 sigmoid
return self.down_proj(self.act_fn(self.gate_proj(hidden_state)) * self.up_proj(hidden_state))
class Qwen2Attention(nn.Module):
def __init__(self, config: Qwen2Config, layer_idx: Optional[int] = None):
super().__init__() # 调用父类的初始化方法
self.config = config # 配置类实例
self.layer_idx = layer_idx # 层索引
if layer_idx is None:
logger.warning_once(
f"Instantiating {self.__class__.__name__} without passing `layer_idx` is not recommended and will "
"to errors during the forward call, if caching is used. Please make sure to provide a `layer_idx` "
"when creating this class."
)
self.hidden_size = config.hidden_size # d
self.num_heads = config.num_attention_heads # q_h
self.head_dim = self.hidden_size // self.num_heads # dk
self.num_key_value_heads = config.num_key_value_heads # kv_h
self.num_key_value_groups = self.num_heads // self.num_key_value_heads # 比例
self.max_position_embeddings = config.max_position_embeddings # p
self.rope_theta = config.rope_theta # base
self.is_causal = True # 是否用因果掩码
self.attention_dropout = config.attention_dropout # dropout
# 嵌入维度必须能被整除
if (self.head_dim * self.num_heads) != self.hidden_size:
raise ValueError(
f"hidden_size must be divisible by num_heads (got `hidden_size`: {self.hidden_size}"
f" and `num_heads`: {self.num_heads})."
)
# 线性投影
self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=True)
#需要注意的是这里的投影维度可能和q的投影维度不同
self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=True)
self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=True)
# 最后一个线性转换层
self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)
# 旋转位置嵌入层
self.rotary_emb = Qwen2RotaryEmbedding(
self.head_dim, # dk
max_position_embeddings=self.max_position_embeddings,# max_position
base=self.rope_theta, # base
)
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,# 可选
position_ids: Optional[torch.LongTensor] = None,# 可选
past_key_value: Optional[Cache] = None, # 可选参数:缓存
output_attentions: bool = False,# 是否输出注意力权重
use_cache: bool = False, # 是否使用缓存
cache_position: Optional[torch.LongTensor] = None, # 缓存位置
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
bsz, q_len, _ = hidden_states.size() # b,s,d
# 投影
query_states = self.q_proj(hidden_states)
key_states = self.k_proj(hidden_states)
value_states = self.v_proj(hidden_states)
# (b,q_len,q_h,dk)-->(b,q_h,q_len,dk),transpose:换轴(转置)
query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
# (b,k_h,k_len,dk)
key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
kv_seq_len = key_states.shape[-2] # k_len
# 缓存上个时间步的key,value表示
if past_key_value is not None: # 如果设置了缓存
if self.layer_idx is None: # 就必须有layer_idx,不然报错
raise ValueError(
f"The cache structure has changed since version v4.36. If you are using {self.__class__.__name__} "
"for auto-regressive decoding with k/v caching, please make sure to initialize the attention class "
"with a layer index."
)
kv_seq_len += past_key_value.get_usable_length(kv_seq_len, self.layer_idx)
# 旋转位置嵌入,传kv_len
# 键/值序列长度:kv_seq_len 是键和值向量的长度,这是因为键和值向量代表的是相同的序列。
# 查询序列长度:q_len 是查询向量的长度,这可能不同于键/值向量的长度。
# 旋转位置嵌入:在计算旋转位置嵌入时,使用键/值序列长度是为了确保位置信息与键和值向量一致。
cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len)
# 返回带位置信息的嵌入表示
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids)
# 如果past_key_value is not None
if past_key_value is not None:
cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} # Specific to RoPE models
# 更新
key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs)
# repeat k/v heads if n_kv_heads < n_heads
# 如果键值头数量少于查询头数量,则重复键值头以匹配查询头数量。
key_states = repeat_kv(key_states, self.num_key_value_groups)
value_states = repeat_kv(value_states, self.num_key_value_groups)
# (b,q_h,q_len,dk)@(b,k_h,dk,k_len)-->(b,h,q_len,k_len)
attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim)
if attn_weights.size() != (bsz, self.num_heads, q_len, kv_seq_len):
raise ValueError(
f"Attention weights should be of size {(bsz, self.num_heads, q_len, kv_seq_len)}, but is"
f" {attn_weights.size()}"
)
# 切片,在最后一个维度切出q_len的长度
if attention_mask is not None: # no matter the length, we just slice it
causal_mask = attention_mask[:, :, :, : key_states.shape[-2]]
# 相加,一般遮挡的地方是很大的负数
attn_weights = attn_weights + causal_mask
# upcast attention to fp32
# 在q_len上归一化,得到query序列中每个token对应key中token的一系列权重,这些权重中较大的值表示和当前query中的token
# 相似度较近,较小的表示离当前query中token较远
attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype)
# dropout
attn_weights = nn.functional.dropout(attn_weights, p=self.attention_dropout, training=self.training)
# (b,h,q_len,k_len)@(b,h,v_len,dk)-->(b,h,q_len,dk)
attn_output = torch.matmul(attn_weights, value_states)
if attn_output.size() != (bsz, self.num_heads, q_len, self.head_dim):
raise ValueError(
f"`attn_output` should be of size {(bsz, self.num_heads, q_len, self.head_dim)}, but is"
f" {attn_output.size()}"
)
# (b,h,q_len,dk)-->(b,h,q_len,h,dk),之后.contiguous()转为内存连续存储
attn_output = attn_output.transpose(1, 2).contiguous()
# (b,h,q_len,h,dk)-->(b,h,d)
attn_output = attn_output.reshape(bsz, q_len, self.hidden_size)
# 最后经过线性转换
attn_output = self.o_proj(attn_output)
# 不输出注意力权重
if not output_attentions:
attn_weights = None
# 返回多头注意力的输出,注意力权重,上个时间步的key_value的缓存
return attn_output, attn_weights, past_key_value