开源项目分析:EDoRA | 了解如何基于peft实现EDoRA方法
论文地址: Efficient Weight-Decomposed Low-Rank Adaptation via Singular Value Decomposition
项目地址:https://github.com/Hamid-Nasiri/EDoRA/tree/main
EDoRA是2025年新出的lora变种技术,在论文中分享了一种比lora精度高且显存占用低的方法。该方法目前还没有被peft库收录,为此对方法实现进行源码探索。以便于在其他项目中引入EDoRA技术。
1、导包分析
基于入口代码main_glue.py的分析可以发现,edora基于peft库与自定义的属性替换方法实现。
2、DoRA权重构建
EDoRA基于DoRA改进实现,因此模型初始化基于DoRA(在peft库中已经有DoRA方法)。
**peft_config ** 定义中可以看到use_dora被设置为true;target_modules基于不同模型设置
peft_config = LoraConfig(use_dora=True,task_type="SEQ_CLS",inference_mode=False,r=model_args.lora_rank,lora_alpha=model_args.lora_alpha,lora_dropout=0.0,target_modules=["query", "value", "attention.output.dense", "output.dense"],)
3、svd参数配置
svd参数
adapter_name = "default"peft_config_dict = {}if not isinstance(peft_config, PromptLearningConfig):peft_config_dict[adapter_name] = peft_config# Use this for SVD Initialization# with open("config/reconstruct_config.yaml", "r") as stream:# reconstr_config = yaml.load(stream, Loader=yaml.FullLoader)# Use this for Random Initializationwith open("config/reconstruct_config_no_svd.yaml", 'r') as stream:reconstr_config = yaml.load(stream, Loader=yaml.FullLoader)reconstr_type = reconstr_config["reconstruction_type"]reconstr_config[reconstr_type]["rank"] = peft_config_dict[adapter_name].r
对应yaml文件内容如下
reconstruction_type: "svd"
reconstr_mode: "separated"
half_init_dec: False
replacement_module_random_init: True
r_squared: True
svd:n_iter: 10random_state: 42
4、EDoRA实现
基于get_peft_model函数先实现一个DoRA模型,然后基于find_and_initialize对forward进行修改,实现EDoRA方法。
model = get_peft_model(model, peft_config)find_and_initialize(model,peft_config_dict,adapter_name=adapter_name,reconstr_type=reconstr_type,writer=tb_writer,reconstruct_config=reconstr_config,sigma_val=model_args.init_sigma)
find_and_initialize函数在 utils\initialization_utils.py 中,核心是基于yaml里的separated配置决定svd分解
基于r_squared配置,对layer进行forward替换。
replacement_encoder_weight, replacement_decoder_weight = get_replacement_module(weight=target.weight.T,module_name=key,type=reconstr_type,writer=writer,reconstruct_config=reconstruct_config)if not isinstance(target, peft.tuners.lora.Linear):raise NotImplementedError('Only initialization for peft.tuners.lora.Linear type is implemented.')# TODO implement for Linear8bitLtelse:if half_init_dec:kaiming_uniform_init_lower_half(replacement_decoder_weight)if replacement_module_random_init:kaiming_uniform_init(replacement_encoder_weight)kaiming_uniform_init(replacement_decoder_weight)replace_module_weights(target.lora_B.default, replacement_decoder_weight.T)if r_squared:target.forward = types.MethodType(forward_edora, target)target.get_delta_weight = types.MethodType(get_delta_weight, target)replace_module_weights(target.lora_A.default, replacement_encoder_weight.T)target.default_lora_latent_mapping = torch.nn.Linear(lora_config.r, lora_config.r, bias=False)init_module_weights(target.default_lora_latent_mapping, sigma=sigma_val)target.default_lora_latent_mapping.to(target.lora_A.default.weight.device)target.lora_A.default.weight.requires_grad = False # only the r*r matrix will be tunedtarget.lora_B.default.weight.requires_grad = False # only the r*r matrix will be tuned
SVD分解参数 get_replacement_module函数代码如下
def get_replacement_module(weight, module_name, type, writer, reconstruct_config):cfg = reconstruct_config[type]if type == 'svd':reconstructed_matrix, enc, dec = get_linear_rec_svd(weight.cpu().detach().numpy(), cfg['rank'],cfg['n_iter'],cfg['random_state'])final_enc = torch.tensor(enc, dtype=weight.dtype, device=weight.device)final_dec = torch.tensor(dec, dtype=weight.dtype, device=weight.device)else:raise NotImplementedError(f"{type} is currently not supported.")return final_enc, final_decfrom sklearn.decomposition import TruncatedSVD
import numpy as np
from typing import Tupledef run_svd(input_matrix: np.ndarray, rank: int, n_iter: int, random_state: int) -> Tuple[np.ndarray, TruncatedSVD]:svd = TruncatedSVD(n_components=rank, n_iter=n_iter, random_state=random_state)svd.fit(input_matrix)reduced_matrix = svd.transform(input_matrix)return reduced_matrix, svddef get_linear_rec_svd(input_matrix: np.ndarray, rank: int, n_iter: int,random_state: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:reduced_matrix, svd = run_svd(input_matrix, rank, n_iter, random_state)reconstructed_matrix = svd.inverse_transform(reduced_matrix)return reconstructed_matrix, reduced_matrix, svd.components_
RxR矩阵注入
target.default_lora_latent_mapping = torch.nn.Linear(lora_config.r, lora_config.r, bias=False)
**forword实现 ** 具体代码在 utils\latent_utils.py 中
def forward_edora(self, x:torch.Tensor):# Maybe we can use the following instead:# result = F.linear(x, transpose(self.weight, self.fan_in_fan_out), bias=self.bias)result = self.base_layer(x)torch_result_dtype = result.dtypelora_A = self.lora_A[self.active_adapter[0]]lora_B = self.lora_B[self.active_adapter[0]]lora_R = self.default_lora_latent_mappingdropout = self.lora_dropout[self.active_adapter[0]]scaling = self.scaling[self.active_adapter[0]]x = x.to(lora_A.weight.dtype)x = dropout(x)# Applying EDoRAlora_weight = lora_B.weight @ lora_R.weight @ lora_A.weightmagnitude = self.lora_magnitude_vector[self.active_adapter[0]]weight = self.get_base_layer().weightweight = weight.to(x.dtype)weight_norm = self._get_weight_norm(weight, lora_weight, scaling)# see section 4.3 of DoRA (https://arxiv.org/abs/2402.09353)# "[...] we suggest treating ||V +∆V ||_c in# Eq. (5) as a constant, thereby detaching it from the gradient# graph. This means that while ||V + ∆V ||_c dynamically# reflects the updates of ∆V , it won’t receive any gradient# during backpropagation"weight_norm = weight_norm.detach()mag_norm_scale = (magnitude / weight_norm).view(1, -1)result_dora = (mag_norm_scale - 1) * (F.linear(x, transpose(weight, self.fan_in_fan_out))) + mag_norm_scale * lora_B(lora_R(lora_A(x))) * scaling# Note: Computation could potentially be accelerated by using the code below instead of calculating X@W again.# This is only correct if dropout=0, otherwise results will differ:# https://github.com/huggingface/peft/pull/1474#issuecomment-1964682771# bias = self.get_base_layer().bias# if bias is not None:# result = result - bias# result = mag_norm_scale * result + mag_norm_scale * lora_B(lora_A(x)) * scaling# if bias is not None:# result = result + biasresult = result + result_doraresult = result.to(torch_result_dtype)return result
5、参数合并
EDoRA 项目中,基于transformers.Trainer.save_model()完成lora模型的保存。
参数合并代码在 utils\merge_adapter_to_base_model.py中,具体如下:
1、先初始化一个DoRA模型
2、调用find_and_initialize将DoRA替换为EDoRA模型
3、加载transformers.Trainer.save_model保存的adapter_model.safetensors,并进行name的替换
4、模型加载权重,并调用merge_and_unload实现模型权重合并。
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel, PeftConfig, LoraConfig, get_peft_model
import argparse
import torch
import os
import json
from pathlib import Path
from safetensors import safe_open
from .initialization_utils import find_and_initializedef main(args):model = AutoModelForCausalLM.from_pretrained(args.base_model,# torch_dtype=torch.float16,device_map='auto',)tokenizer = AutoTokenizer.from_pretrained(args.base_model, device_map='auto')with open(os.path.join(args.adapter, "adapter_config.json")) as f:lora_config_dict = json.load(f)lora_config = LoraConfig(**lora_config_dict)# lora_config = PeftConfig.from_pretrained(args.adapter)# model = PeftModel.from_pretrained(model, args.adapter, config=lora_config)model = get_peft_model(model, lora_config)adapter_name = "default"peft_config_dict = {adapter_name: lora_config}peft_conf_dir = str(Path(args.adapter).parents[0])with open(os.path.join(peft_conf_dir, 'reconstr_config.json')) as fp:reconstr_config = json.load(fp)reconstr_type = reconstr_config['reconstruction_type']# in order to accelerate model preparation, svd iterations will be set to 1.reconstr_config['svd']['n_iter'] = 1find_and_initialize(model, peft_config_dict, adapter_name=adapter_name, reconstr_type=reconstr_type,writer=None, reconstruct_config=reconstr_config)peft_model_weights = {}with safe_open(os.path.join(args.adapter, "adapter_model.safetensors"),framework="pt", device="cpu") as f:for key in f.keys():peft_model_weights[key] = f.get_tensor(key)renamed_state_dict = {k.replace("lora_A", "lora_A.default").replace("lora_B", "lora_B.default").replace("_lora_latent", ".default_lora_latent"): vfor (k, v) in peft_model_weights.items() if "classifier.out_proj" not in k}model.load_state_dict(renamed_state_dict, strict=False)print("merging the LoRA into the base model.")model = model.merge_and_unload()print("Saving the merged model to disk.")model.save_pretrained(args.output_path)tokenizer.save_pretrained(args.output_path)if __name__ == "__main__":parser = argparse.ArgumentParser(description='Merge Adapter to Base Model')parser.add_argument('--base_model', type=str)parser.add_argument('--adapter', type=str)parser.add_argument('--output_path', type=str)args = parser.parse_args()main(args)
在peft库中给出的Transformers库模型的lora权重装卸案例如下: