当前位置: 首页 > news >正文

Ubuntu部署ktransformers

准备工作

一台服务器

CPU:500G

GPU:48G(NVIDIA4090)

系统:Ubuntu20.04(github的文档好像用的是22.04)

第一步:下载权重文件

1.下载hfd

wget https://hf-mirror.com/hfd/hfd.sh
chmod a+x hfd.sh

2.设置环境变量

export HF_ENDPOINT=https://hf-mirror.com

3.下载模型(需要梯子,需要带上huggingface的token)

./hfd.sh gpt2

4.下载数据集(需要梯子,需要带上huggingface的token)

./hfd.sh wikitext --dataset

5.下载大文件(需要梯子,文件很大,大约四五百G)

./hfd.sh unsloth/DeepSeek-R1-GGUF --include DeepSeek-R1-Q4_K_M/*

第二步:拉代码,编译代码

1.使用Anaconda3安装Python3.11

conda create --name ktransformers python=3.11
conda activate ktransformers 
conda install -c conda-forge libstdcxx-ng

2.安装其他依赖

pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip3 install packaging ninja cpufeature numpy
sudo add-apt-repository ppa: ubuntu-toolchain-r/test
sudo apt-get update
sudo apt-get install --only-upgrade libstdc++6
pip install flash-attn --no-build-isolation

3.查看显卡版本及cuda版本

以下两条指令显示的CUDA版本需要一致,若不一致,系统会以nvcc --version的为准

nvcc --version
nvidia-smi

4.拉代码

git clone https://github.com/kvcache-ai/ktransformers.git

cd ktransformers

git submodule init

git submodule update

5.编译

export USE_NUMA=1
make dev_install

第三部:运行

python ktransformers/local_chat.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/dpkj/deepseek/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/ --cpu_infer 50 --cache_lens 1536 --max_new_tokens 8192

# --model_path:模型位置,不需要修改
# --gguf_path:前面下载的大文件,模型文件位置,按照实际情况而定
# --cpu_infer:CPU占用,单位百分比,如果服务器不死DDR5双路CPU,可以适量调低此占比

其他启动参数

python -m transformers.local_chat --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/ --cpu_infer 53 --cache_lens 1536

python ./transformers/local_chat.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/shadeform/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 53 --cache_lens 1536 --optimize_config_path transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin.yaml

python -m transformers.local_chat --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/autodi-tmp/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 128 --cache_lens 1536 --max_new_tokens 8192 --optimize_config_path ./transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin-4.yaml

transformers --model_path deepseek-ai/DeepSeek-R1 --gguf_path /root/autodi-tmp/DeepSeek-R1-GGUF/DeepSeek-R1-Q4 K M/ --cpu_infer 65 --cache_lens 1536 --max_new_tokens 8192 --port 6006 --optimize_config_path /transformers/optimize/optimize_rules/DeepSeek-V3-Chat-multi-gpu-marlin-4.yaml

curl -X 'POST"
    "http://localhost:6006/v1/chat/completions'\
    -H 'accept: application/json' \
    -H 'Content-Type: application/json' \
    -d'{
        "messages": [
        "content": "tell a joke",
        "role": "user"
    ],
    "model": "ktranformers-model",
    "stream": true
}'

外传

1. 使用API方式调用

新建文件:chat_openai.py

import argparse
import uvicorn
from typing import List, Dict, Optional, Any
from fastapi import FastAPI, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import os
import sys
import time
from fastapi import Request
from fastapi.responses import StreamingResponse, JSONResponse
import json
import logging

# 设置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

project_dir = os.path.dirname(os.path.dirname(__file__))
sys.path.insert(0, project_dir)
import torch
from transformers import (
    AutoTokenizer,
    AutoConfig,
    AutoModelForCausalLM,
    GenerationConfig,
    TextStreamer,
)
from ktransformers.optimize.optimize import optimize_and_load_gguf
from ktransformers.models.modeling_deepseek import DeepseekV2ForCausalLM
from ktransformers.models.modeling_qwen2_moe import Qwen2MoeForCausalLM
from ktransformers.models.modeling_deepseek_v3 import DeepseekV3ForCausalLM
from ktransformers.models.modeling_llama import LlamaForCausalLM
from ktransformers.models.modeling_mixtral import MixtralForCausalLM
from ktransformers.util.utils import prefill_and_generate
from ktransformers.server.config.config import Config

custom_models = {
    "DeepseekV2ForCausalLM": DeepseekV2ForCausalLM,
    "DeepseekV3ForCausalLM": DeepseekV3ForCausalLM,
    "Qwen2MoeForCausalLM": Qwen2MoeForCausalLM,
    "LlamaForCausalLM": LlamaForCausalLM,
    "MixtralForCausalLM": MixtralForCausalLM,
}

ktransformer_rules_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "optimize", "optimize_rules")
default_optimize_rules = {
    "DeepseekV2ForCausalLM": os.path.join(ktransformer_rules_dir, "DeepSeek-V2-Chat.yaml"),
    "DeepseekV3ForCausalLM": os.path.join(ktransformer_rules_dir, "DeepSeek-V3-Chat.yaml"),
    "Qwen2MoeForCausalLM": os.path.join(ktransformer_rules_dir, "Qwen2-57B-A14B-Instruct.yaml"),
    "LlamaForCausalLM": os.path.join(ktransformer_rules_dir, "Internlm2_5-7b-Chat-1m.yaml"),
    "MixtralForCausalLM": os.path.join(ktransformer_rules_dir, "Mixtral.yaml"),
}

# 全局变量,存储初始化后的模型
chat_model = None

class OpenAIChat:
    def __init__(
        self,
        model_path: str,
        optimize_rule_path: str = None,
        gguf_path: str = None,
        cpu_infer: int = Config().cpu_infer,
        use_cuda_graph: bool = True,
        mode: str = "normal",
    ):
        torch.set_grad_enabled(False)
        Config().cpu_infer = cpu_infer

        self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
        config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
        self.streamer = TextStreamer(self.tokenizer, skip_prompt=True) if not Config().cpu_infer else None
        if mode == 'long_context':
            assert config.architectures[0] == "LlamaForCausalLM", "Only LlamaForCausalLM supports long_context mode"
            torch.set_default_dtype(torch.float16)
        else:
            torch.set_default_dtype(config.torch_dtype)

        with torch.device("meta"):
            if config.architectures[0] in custom_models:
                if "Qwen2Moe" in config.architectures[0]:
                    config._attn_implementation = "flash_attention_2"
                if "Llama" in config.architectures[0]:
                    config._attn_implementation = "eager"
                if "Mixtral" in config.architectures[0]:
                    config._attn_implementation = "flash_attention_2"
                model = custom_models[config.architectures[0]](config)
            else:
                model = AutoModelForCausalLM.from_config(
                    config, trust_remote_code=True, attn_implementation="flash_attention_2"
                )

        if optimize_rule_path is None:
            if config.architectures[0] in default_optimize_rules:
                optimize_rule_path = default_optimize_rules[config.architectures[0]]

        optimize_and_load_gguf(model, optimize_rule_path, gguf_path, config)
        
        try:
            model.generation_config = GenerationConfig.from_pretrained(model_path)
        except:
            model.generation_config = GenerationConfig(
                max_length=128,
                temperature=0.7,
                top_p=0.9,
                do_sample=True
            )
        
        if model.generation_config.pad_token_id is None:
            model.generation_config.pad_token_id = model.generation_config.eos_token_id
        
        model.eval()
        self.model = model
        self.use_cuda_graph = use_cuda_graph
        self.mode = mode
        logger.info("Model loaded successfully!")

    def create_chat_completion(
        self,
        messages: List[Dict[str, str]],
        temperature: float = 0.7,
        max_tokens: int = 1000,
        top_p: float = 0.9,
        force_think: bool = False,
    ) -> Dict:
        input_tensor = self.tokenizer.apply_chat_template(
            messages, add_generation_prompt=True, return_tensors="pt"
        )
        
        if force_think:
            token_thinks = torch.tensor([self.tokenizer.encode("<think>\\n", add_special_tokens=False)],
                                        device=input_tensor.device)
            input_tensor = torch.cat([input_tensor, token_thinks], dim=1)

        generation_config = GenerationConfig(
            temperature=temperature,
            top_p=top_p,
            max_new_tokens=max_tokens,
            do_sample=True  # Ensure do_sample is True if using temperature or top_p
        )

        generated = prefill_and_generate(
            self.model,
            self.tokenizer,
            input_tensor.cuda(),
            max_tokens,
            self.use_cuda_graph,
            self.mode,
            force_think
        )

        # Convert token IDs to text
        generated_text = self.tokenizer.decode(generated, skip_special_tokens=True)

        return {
            "choices": [{
                "message": {
                    "role": "assistant",
                    "content": generated_text
                }
            }],
            "usage": {
                "prompt_tokens": input_tensor.shape[1],
                "completion_tokens": len(generated),
                "total_tokens": input_tensor.shape[1] + len(generated)
            }
        }

class ChatMessage(BaseModel):
    role: str
    content: str

class ChatCompletionRequest(BaseModel):
    messages: List[ChatMessage]  # 确保 messages 是 Pydantic 模型实例的列表
    model: str = "default-model"
    temperature: Optional[float] = 0.7
    top_p: Optional[float] = 0.9
    max_tokens: Optional[int] = 1000
    stream: Optional[bool] = False
    force_think: Optional[bool] = True

class ChatCompletionResponse(BaseModel):
    id: str = "chatcmpl-default"
    object: str = "chat.completion"
    created: int = 0
    model: str = "default-model"
    choices: List[Dict[str, Any]]
    usage: Dict[str, int]

app = FastAPI(title="KVCache.AI API Server")

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = f"{process_time:.4f}s"
    return response

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def chat_completion(request: ChatCompletionRequest):
    try:
        # 如果 messages 是 Pydantic 模型实例列表,使用 model_dump
        messages = [m.model_dump() for m in request.messages]
        response = chat_model.create_chat_completion(
            messages=messages,
            temperature=request.temperature,
            max_tokens=request.max_tokens,
            top_p=request.top_p,
            force_think=request.force_think
        )

        return {
            "id": f"chatcmpl-{int(time.time())}",
            "object": "chat.completion",
            "created": int(time.time()),
            "model": request.model,
            "choices": [{
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": response['choices'][0]['message']['content']
                },
                "finish_reason": "stop"
            }],
            "usage": response['usage']
        }
    except Exception as e:
        logger.error(f"API Error: {str(e)}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Internal server error: {str(e)}"
        )

def create_app(model_path: str, gguf_path: str, cpu_infer:int, optimize_rule_path: Optional[str] = None):
    global chat_model
    chat_model = OpenAIChat(
        model_path=model_path,
        gguf_path=gguf_path,
        optimize_rule_path=optimize_rule_path,
        cpu_infer=cpu_infer
    )
    return app

def main():
    parser = argparse.ArgumentParser(description="KVCache.AI API Server")
    parser.add_argument("--model_path", type=str, required=True, help="HuggingFace模型路径")
    parser.add_argument("--gguf_path", type=str, required=True, help="GGUF模型文件路径")
    parser.add_argument("--optimize_rule_path", type=str, help="优化规则文件路径")
    parser.add_argument("--port", type=int, default=8000, help="服务端口号")
    parser.add_argument("--cpu_infer", type=int, default=10, help="使用cpu数量")
    parser.add_argument("--host", type=str, default="0.0.0.0", help="绑定地址")
    args = parser.parse_args()

    create_app(
        model_path=args.model_path,
        gguf_path=args.gguf_path,
        optimize_rule_path=args.optimize_rule_path,
        cpu_infer=args.cpu_infer
    )

    uvicorn.run(
        app,
        host=args.host,
        port=args.port,
        loop="uvloop",
        http="httptools",
        timeout_keep_alive=300,
        log_level="info",
        access_log=False
    )

if __name__ == "__main__":
    main()

文件防止位置:

安装依赖:

pip install protobuf uvicorn httptools
pip install uvloop

启动:

python ktransformers/chat_openai.py --model_path deepseek-ai/DeepSeek-R1 --gguf_path /home/dpkj/deepseek/DeepSeek-R1-GGUF/DeepSeek-R1-Q4_K_M/

2.使用open-WEBUI进行可视化对接

# 使用Pip下载OPEN-WEBUI

pip install open-webui
# 下载完成后开启服务
open-webui serve
#启动成功如下
在OPEN-WebUI

import os
import json
import requests
from pydantic import BaseModel, Field
from typing import List, Union, Iterator

# Set DEBUG to True to enable detailed logging
DEBUG = False


class Pipe:
    class Valves(BaseModel):
        openai_API_KEY: str = Field(default="none")  # Optional API key if needed
        DEFAULT_MODEL: str = Field(default="DeepSeek-R1")  # Default model identifier

    def __init__(self):
        self.id = "DeepSeek-R1"
        self.type = "manifold"
        self.name = "KT: "
        self.valves = self.Valves(
            **{
                "openai_API_KEY": os.getenv("openai_API_KEY", "none"),
                "DEFAULT_MODEL": os.getenv("openai_DEFAULT_MODEL", "DeepSeek-R1"),
            }
        )
        # Self-hosted FastAPI server details
        self.api_url = (
            "http://localhost:8000/v1/chat/completions"  # FastAPI server endpoint
        )
        self.headers = {"Content-Type": "application/json"}

    def get_openai_models(self):
        """Return available models - for openai we'll return a fixed list"""
        return [{"id": "KT", "name": "DeepSeek-R1"}]

    def pipes(self) -> List[dict]:
        return self.get_openai_models()

    def pipe(self, body: dict) -> Union[str, Iterator[str]]:
        try:
            # Use default model ID since OpenAI has a single endpoint
            model_id = self.valves.DEFAULT_MODEL
            messages = []

            # Process messages including system, user, and assistant messages
            for message in body["messages"]:
                if isinstance(message.get("content"), list):
                    # For OpenAI, we'll join multiple content parts into a single text
                    text_parts = []
                    for content in message["content"]:
                        if content["type"] == "text":
                            text_parts.append(content["text"])
                        elif content["type"] == "image_url":
                            # OpenAI might not support image inputs - add a note about the image
                            text_parts.append(f"[Image: {content['image_url']['url']}]")
                    messages.append(
                        {"role": message["role"], "content": "".join(text_parts)}
                    )
                else:
                    # Handle simple text messages
                    messages.append(
                        {"role": message["role"], "content": message["content"]}
                    )

            if DEBUG:
                print("FastAPI API request:")
                print(" Model:", model_id)
                print(" Messages:", json.dumps(messages, indent=2))

            # Prepare the API call parameters
            payload = {
                "model": model_id,
                "messages": messages,
                "temperature": body.get("temperature", 0.7),
                "top_p": body.get("top_p", 0.9),
                "max_tokens": body.get("max_tokens", 8192),
                "stream": body.get("stream", True),
            }

            # Add stop sequences if provided
            if body.get("stop"):
                payload["stop"] = body["stop"]

            # Sending request to local FastAPI server
            if body.get("stream", False):
                # Streaming response
                def stream_generator():
                    try:
                        response = requests.post(
                            self.api_url,
                            json=payload,
                            headers=self.headers,
                            stream=True,
                        )
                        for line in response.iter_lines():
                            if line:
                                yield line.decode("utf-8")
                    except Exception as e:
                        if DEBUG:
                            print(f"Streaming error: {e}")
                        yield f"Error during streaming: {str(e)}"

                return stream_generator()
            else:
                # Regular response
                response = requests.post(
                    self.api_url, json=payload, headers=self.headers
                )
                if response.status_code == 200:
                    generated_content = (
                        response.json()
                        .get("choices", [{}])[0]
                        .get("message", {})
                        .get("content", "")
                    )
                    return generated_content
                else:
                    return f"Error: {response.status_code}, {response.text}"
        except Exception as e:
            if DEBUG:
                print(f"Error in pipe method: {e}")
            return f"Error: {e}"

    def health_check(self) -> bool:
        """Check if the OpenAI API (local FastAPI service) is accessible"""
        try:
            # Simple health check with a basic prompt
            response = requests.post(
                self.api_url,
                json={
                    "model": self.valves.DEFAULT_MODEL,
                    "messages": [{"role": "user", "content": "Hello"}],
                    "max_tokens": 5,
                },
                headers=self.headers,
            )
            return response.status_code == 200
        except Exception as e:
            if DEBUG:
                print(f"Health check failed: {e}")
            return False

完~
<script src="chrome-extension://bincmiainjofjnhchmcalkanjebghoen/aiscripts/script-main.js"></script>
http://www.lryc.cn/news/540741.html

相关文章:

  • 助力DeepSeek私有化部署服务:让企业AI落地更简单、更安全
  • 面试官询问项目前后端人员配比之高分示范回答
  • MyBatis中的日志和映射器说明
  • 深入了解 Pinia:Vue 的下一代状态管理工具 (上篇)
  • Unity 中导入的VRM模型渲染为VRoid风格
  • 【ELK】【Elasticsearch 】DSL 和 DQL
  • 最新版本Exoplayer扩展FFmpeg音频软解码保姆级教程
  • 面对低消费欲人群,我们如何开发其需求?
  • 《算法基础入门:最常用的算法详解与应用(持续更新实战与面试题)》
  • Linux设备驱动-练习
  • 蓝桥杯核心内容
  • Spring Boot拦截器(Interceptor)详解
  • 非常好用的ssh工具Xterminal
  • 【Python项目】基于Django的医疗领域用户问答意图识别系统
  • 深入理解指针(六)
  • Linux下基本指令(4)
  • vue 手写分页
  • Spring Boot项目接收前端参数的11种方式
  • Springboot项目:使用MockMvc测试get和post接口(含单个和多个请求参数场景)
  • OpenAI ChatGPT在心理治疗领域展现超凡同理心,通过图灵测试挑战人类专家
  • 【HBase】HBaseJMX 接口监控信息实现钉钉告警
  • 25旅游管理研究生复试面试问题汇总 旅游管理专业知识问题很全! 旅游管理复试全流程攻略 旅游管理考研复试真题汇总
  • 深入解析C++26 Execution Domain:设计原理与实战应用
  • Linux命令基础
  • 什么是超越编程(逾编程)(元编程?)
  • netcore libreoffice word转pdf中文乱码
  • 【练习】【回溯:组合:一个集合 元素可重复】力扣 39. 组合总和
  • Mac 清理缓存,提高内存空间
  • 数据结构——二叉树经典习题讲解
  • 神经网络八股(三)