Triton Server部署Embedding模型
在32核CPU、无GPU的服务器上,使用Python后端和ONNX后端部署嵌入模型,并实现并行调用和性能优化策略。
方案一:使用Python后端部署Embedding模型
Python后端提供了极大的灵活性,可以直接在Triton中运行您熟悉的sentence-transformers
代码。但其性能开销相对较高,需要精细配置以充分利用CPU资源。
1. 拉取Triton Docker镜像
首先,拉取包含Python后端的Triton Server官方镜像。
# 将<xx.yy>替换为最新或您需要的版本,例如 24.06
docker pull nvcr.io/nvidia/tritonserver:<xx.yy>-py3
2. 准备模型仓库
您需要创建一个模型目录,其中包含一个model.py
脚本(用于加载和运行模型)和一个config.pbtxt
配置文件(用于定义模型行为和优化)。
-
创建目录结构:
mkdir -p model_repository/embedding_py/1
-
创建
model.py
脚本:
此脚本的核心是在initialize
方法中加载模型(只执行一次),并在execute
方法中处理推理请求。# 保存为 model_repository/embedding_py/1/model.py import json import numpy as np import torch from sentence_transformers import SentenceTransformer import triton_python_backend_utils as pb_utilsclass TritonPythonModel:def initialize(self, args):"""在模型加载时调用一次。"""# 关键性能优化:当Triton实例数(count) > 1时,限制每个实例的PyTorch线程数# 可以避免实例间的线程竞争,让Triton来调度并行。torch.set_num_threads(1)# 从模型目录加载模型,使用CPUself.model = SentenceTransformer(args['model_repository'], device='cpu')print('SentenceTransformer model loaded on CPU.')def execute(self, requests):"""处理每一批推理请求。"""responses =for request in requests:# 1. 从请求中获取输入张量 (文本字符串)# as_numpy()会返回一个包含字节串的NumPy数组in_str = pb_utils.get_input_tensor_by_name(request, "TEXT")sentences = [s.decode('utf-8') for s in in_str.as_numpy()]# 2. 使用模型进行推理embeddings = self.model.encode(sentences, convert_to_numpy=True)# 3. 创建输出张量out_tensor = pb_utils.Tensor("EMBEDDING", embeddings.astype(np.float32))# 4. 创建并添加响应inference_response = pb_utils.InferenceResponse(output_tensors=[out_tensor])responses.append(inference_response)return responsesdef finalize(self):"""在模型卸载时调用。"""self.model = Noneprint('Cleaned up model.')
-
创建
config.pbtxt
配置文件:
这是CPU性能优化的关键。我们将创建多个模型实例(instance_group
)以利用全部32个核心。# 保存为 model_repository/embedding_py/config.pbtxt name: "embedding_py" backend: "python" max_batch_size: 64 # 允许服务器对请求进行批处理input # 可变长度的字符串输入} ]output # 假设嵌入维度为768} ]# --- 性能优化配置 --- # 为CPU创建多个模型实例以实现并行处理 instance_group# 动态批处理:在CPU上依然有用,可以分摊请求开销 dynamic_batching {max_queue_delay_microseconds: 10000 # 10毫秒延迟换取更大批次preferred_batch_size: }
3. 启动Triton服务器 (CPU模式)
使用docker run
命令启动服务器,注意不使用--gpus
标志。
docker run --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 \
-v $(pwd)/model_repository:/models \
nvcr.io/nvidia/tritonserver:<xx.yy>-py3 \
tritonserver --model-repository=/models
服务器启动后,您会看到日志显示embedding_py
模型已经READY
。
4. 并行推理客户端
为了充分利用服务器端配置的16个实例,客户端必须能够并行地发送请求。使用asyncio
和aiohttp
是实现高并发客户端的有效方式。
# client_py.py
import asyncio
import numpy as np
import tritonclient.http.aio as aiohttpclientSERVER_URL = "localhost:8000"
MODEL_NAME = "embedding_py"async def send_request(client, text_list):# 准备输入数据text_array = np.array([[s.encode('utf-8')] for s in text_list], dtype=np.object_)inputs =inputs.set_data_from_numpy(text_array, binary_data=True)# 准备输出outputs =# 发送请求response = await client.infer(MODEL_NAME, inputs, outputs=outputs)embedding = response.as_numpy("EMBEDDING")return embeddingasync def main():# 模拟大量并发任务tasks =# 创建一个可以并行发送请求的客户端async with aiohttpclient.InferenceServerClient(SERVER_URL) as client:# 假设我们要并行处理100个请求,每个请求包含4个句子for i in range(100):task_texts =tasks.append(send_request(client, task_texts))# 等待所有并发请求完成results = await asyncio.gather(*tasks)print(f"成功完成 {len(results)} 个并行任务。")print(f"第一个任务的嵌入向量形状: {results.shape}")if __name__ == "__main__":asyncio.run(main())
5. Python后端性能优化策略总结
- 核心策略:增加实例数 (
instance_group
):这是在多核CPU上实现吞吐量扩展的最重要手段。将count
设置为接近CPU核心数(例如16, 24, 32),让Triton的多个模型实例并行处理请求。 - 限制实例内线程 (
torch.set_num_threads(1)
):当count
> 1时,这是至关重要的优化。它避免了多个PyTorch实例争抢CPU资源,将并行化的任务完全交给Triton调度,从而减少了线程切换的开销。 - 动态批处理 (
dynamic_batching
):虽然在CPU上的收益不如GPU明显,但它仍然可以通过将小请求聚合成大批次,来分摊Python解释器和请求处理的固定开销,从而提升吞吐量。
方案二:使用ONNX后端部署Embedding模型
ONNX (Open Neural Network Exchange) 是一种为机器学习模型设计的开放格式。使用ONNX后端通常比Python后端性能更高,因为它绕过了Python解释器,直接由高度优化的C++运行时(ONNX Runtime)执行。
1. 转换模型为ONNX格式
首先,您需要将sentence-transformers
模型导出为ONNX格式。使用optimum
库是最简单的方式。
pip install optimum[exporters]
# 将 BAAI/bge-base-en-v1.5 替换为您想使用的模型
optimum-cli export onnx --model BAAI/bge-base-en-v1.5 --task feature-extraction./embedding_onnx_model
执行后,./embedding_onnx_model
目录下会生成model.onnx
文件。
2. 准备模型仓库
-
创建目录结构并移动模型:
mkdir -p model_repository/embedding_onnx/1 mv./embedding_onnx_model/model.onnx model_repository/embedding_onnx/1/
-
创建
config.pbtxt
配置文件:
此配置将指向ONNX模型,并同样通过instance_group
进行CPU优化。# 保存为 model_repository/embedding_onnx/config.pbtxt name: "embedding_onnx" backend: "onnxruntime" # 或 platform: "onnxruntime_onnx" max_batch_size: 128input # 动态批处理, 动态序列长度},{name: "attention_mask"data_type: TYPE_INT64dims: [ -1, -1 ]} ]output # 假设嵌入维度为768} ]# --- 性能优化配置 --- instance_groupdynamic_batching {max_queue_delay_microseconds: 5000 # 5毫秒延迟preferred_batch_size: }
3. 启动Triton服务器 (CPU模式)
启动命令与Python后端完全相同,Triton会自动发现并加载embedding_onnx
模型。
docker run --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 \
-v $(pwd)/model_repository:/models \
nvcr.io/nvidia/tritonserver:<xx.yy>-py3 \
tritonserver --model-repository=/models
4. 并行推理客户端
ONNX模型的客户端需要一个额外的步骤:文本分词(Tokenization)。这个过程需要在客户端完成,因为预处理逻辑没有像Python后端那样被部署在服务器上。
# client_onnx.py
import asyncio
import numpy as np
import tritonclient.http.aio as aiohttpclient
from transformers import AutoTokenizerSERVER_URL = "localhost:8000"
MODEL_NAME = "embedding_onnx"
# 使用与导出ONNX时相同的模型名称来加载分词器
TOKENIZER_NAME = "BAAI/bge-base-en-v1.5" # 在全局加载分词器
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_NAME)def mean_pooling(model_output, attention_mask):# 从ONNX输出中提取嵌入向量token_embeddings = model_outputinput_mask_expanded = np.expand_dims(attention_mask, -1).repeat(token_embeddings.shape[-1], -1)sum_embeddings = np.sum(token_embeddings * input_mask_expanded, 1)sum_mask = np.clip(input_mask_expanded.sum(1), a_min=1e-9, a_max=None)return sum_embeddings / sum_maskasync def send_request(client, text_list):# 1. 客户端分词encoded_input = tokenizer(text_list, padding=True, truncation=True, return_tensors='np')# 2. 准备输入张量inputs = [aiohttpclient.InferInput("input_ids", encoded_input['input_ids'].shape, "INT64"),aiohttpclient.InferInput("attention_mask", encoded_input['attention_mask'].shape, "INT64")]inputs.set_data_from_numpy(encoded_input['input_ids'])inputs.[1]set_data_from_numpy(encoded_input['attention_mask'])# 3. 准备输出outputs =# 4. 发送请求response = await client.infer(MODEL_NAME, inputs, outputs=outputs)last_hidden_state = response.as_numpy("last_hidden_state")# 5. 客户端后处理 (Mean Pooling)final_embedding = mean_pooling(last_hidden_state, encoded_input['attention_mask'])return final_embeddingasync def main():# 并发逻辑与Python后端客户端相同tasks =async with aiohttpclient.InferenceServerClient(SERVER_URL) as client:for i in range(100):task_texts =tasks.append(send_request(client, task_texts))results = await asyncio.gather(*tasks)print(f"成功完成 {len(results)} 个并行任务。")print(f"第一个任务的嵌入向量形状: {results.shape}")if __name__ == "__main__":asyncio.run(main())
5. ONNX后端性能优化策略总结
- 核心策略:增加实例数 (
instance_group
):与Python后端同理,这是利用多核CPU的关键。 - 模型优化:ONNX的优势在于其生态系统。在部署前,您可以使用ONNX Runtime工具对模型进行图优化和量化(Quantization)。将模型从FP32量化为INT8,可以在CPU上带来数倍的性能提升,同时只有轻微的精度损失。
- 控制ONNX Runtime线程:虽然
instance_group
是主要的并行手段,您也可以在config.pbtxt
中微调每个ONNX实例的线程数,但这属于更高级的优化,通常保持默认或设置为较小值即可。 - 将预处理/后处理移至客户端:如示例所示,ONNX方案将分词和池化操作放在客户端。这降低了服务器的计算负载,但增加了客户端的复杂性和依赖。
结论与对比
特性 | Python后端 | ONNX后端 |
---|---|---|
性能 | 较低 | 较高 (原生C++执行,无Python开销) |
部署简易度 | 非常简单 (无需模型转换) | 较复杂 (需要导出为ONNX) |
灵活性 | 极高 (可在model.py 中执行任意代码) | 较低 (仅执行ONNX图计算) |
客户端 | 简单 (只需发送文本) | 较复杂 (需要处理分词和后处理) |
优化潜力 | 有限 (主要靠实例数) | 巨大 (图优化、量化等) |
建议:
- 如果您的首要目标是快速原型开发和最大灵活性,请选择Python后端。
- 如果您的目标是追求极致的生产性能和吞吐量,请投入时间选择ONNX后端,并探索其量化等高级优化。
当然,这是一个非常实际且常见的场景。将包含自定义预处理逻辑的复杂流水线部署到Triton,正是其强大功能的体现。在这种情况下,Python后端和ONNX后端的组合使用策略会变得更加清晰。
下面,我将为您分别提供两种方案的超详细部署步骤,以满足您“文件 -> 解析 -> 分块 -> Embedding”的流水线需求,并重点关注在32核CPU服务器上的性能优化。
方案一:纯Python后端实现完整流水线 (一体化方案)
这种方法最为直接,将您所有的自定义代码(文件解析、分块)和模型调用逻辑全部封装在一个Triton Python后端模型中。
架构思路:客户端发送原始文件字节流,Triton服务器上的一个Python模型完成所有工作,然后返回最终的嵌入向量。
1. 拉取Triton Docker镜像
这一步保持不变,确保获取包含Python后端的镜像。
# 将<xx.yy>替换为最新或您需要的版本,例如 24.06
docker pull nvcr.io/nvidia/tritonserver:<xx.yy>-py3
2. 准备模型仓库
您需要一个模型目录,其中包含model.py
脚本、config.pbtxt
配置文件,以及模型运行所需的依赖。
-
创建目录结构:
# 创建模型和版本目录 mkdir -p model_repository/full_pipeline_py/1# 创建一个存放依赖的目录 (可选但推荐) mkdir -p model_repository/full_pipeline_py/requirements
-
准备依赖:
由于Python后端默认环境中可能没有sentence-transformers
或文件解析库(如pypdf2
),您需要提供它们。最简单的方式是创建一个requirements.txt
文件。# 保存为 model_repository/full_pipeline_py/requirements.txt sentence-transformers # 如果您需要解析PDF,可以添加 # pypdf2
Triton会在加载模型时自动使用pip安装这些依赖。
-
创建
model.py
脚本 (核心):
这个脚本将实现您的完整流水线。# 保存为 model_repository/full_pipeline_py/1/model.py import json import numpy as np import torch from sentence_transformers import SentenceTransformer import triton_python_backend_utils as pb_utils import io # 示例:如果您需要解析PDF,可以导入 # from PyPDF2 import PdfReaderclass TritonPythonModel:def initialize(self, args):"""在模型加载时调用一次,用于加载模型和设置参数。"""# 关键性能优化:在多实例CPU部署中,限制每个实例的PyTorch线程数# 避免实例间线程竞争,让Triton的进程级并行发挥最大作用。torch.set_num_threads(1)# 从模型目录加载模型,强制使用CPU# 'args['model_repository']' 指向 /models/full_pipeline_py/self.model = SentenceTransformer(args['model_repository'], device='cpu')print('SentenceTransformer model loaded on CPU.')def _parse_and_chunk(self, file_bytes):"""私有辅助函数,用于实现文件解析和分块逻辑。这是一个示例,您需要根据您的文件类型替换这里的逻辑。"""# --- 在这里实现您复杂的文件解析逻辑 ---# 示例1:假设输入是简单的txt文件try:text = file_bytes.decode('utf-8')except UnicodeDecodeError:text = "Error decoding file content."# 示例2:如果您处理PDF (需要安装PyPDF2)# text = ""# with io.BytesIO(file_bytes) as f:# reader = PdfReader(f)# for page in reader.pages:# text += page.extract_text()# --- 在这里实现您的分块逻辑 ---# 示例:按段落分块chunks = [chunk for chunk in text.split('\n\n') if chunk.strip()]if not chunks:return [""] # 保证至少有一个空字符串块,避免后续处理失败return chunksdef execute(self, requests):"""处理每一批推理请求。"""responses =# Triton会将多个客户端请求合并到'requests'列表中for request in requests:# 1. 从请求中获取输入张量 (原始文件字节)in_file_bytes_tensor = pb_utils.get_input_tensor_by_name(request, "RAW_FILE")# as_numpy()返回一个包含字节对象的NumPy数组,我们处理批次中的每个文件# 对于max_batch_size > 1,这里可能包含多个文件file_bytes_list = in_file_bytes_tensor.as_numpy()all_embeddings =for file_bytes in file_bytes_list:# 2. 对每个文件执行解析和分块text_chunks = self._parse_and_chunk(file_bytes)# 3. 使用模型对所有块进行推理# model.encode 本身支持批处理,效率很高chunk_embeddings = self.model.encode(text_chunks, convert_to_numpy=True)# 4. (可选) 聚合结果,例如,可以对所有块的嵌入向量求平均final_embedding = np.mean(chunk_embeddings, axis=0)all_embeddings.append(final_embedding)# 5. 创建输出张量# 将列表中的所有最终嵌入向量堆叠成一个批处理out_tensor = pb_utils.Tensor("FINAL_EMBEDDING", np.stack(all_embeddings).astype(np.float32))# 6. 创建并添加响应inference_response = pb_utils.InferenceResponse(output_tensors=[out_tensor])responses.append(inference_response)return responses
-
创建
config.pbtxt
配置文件:
此配置是性能优化的关键,我们将创建大量实例来利用32个CPU核心。# 保存为 model_repository/full_pipeline_py/config.pbtxt name: "full_pipeline_py" backend: "python" max_batch_size: 32 # 允许服务器同时处理多达32个文件请求input # 整个流水线的输入是原始文件字节} ]output # 整个流水线的输出是最终的聚合嵌入向量} ]# --- 性能优化配置 --- # 为32核CPU创建大量模型实例以实现最大并行度。 # 每个实例都是一个独立的Python进程,可以并行处理请求。 instance_group# 动态批处理:对于文件处理依然有用。 # 它会将多个独立的文件请求打包成一个批次,一次性传递给execute方法。 dynamic_batching {max_queue_delay_microseconds: 10000 # 10毫秒延迟换取更大批次preferred_batch_size: }
3. 启动Triton服务器 (CPU模式)
docker run --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 \
-v $(pwd)/model_repository:/models \
nvcr.io/nvidia/tritonserver:<xx.yy>-py3 \
tritonserver --model-repository=/models
4. 编写并行客户端
客户端非常简单,只需读取文件并发送即可。为了压测服务器,我们同样使用asyncio
来模拟大量并发用户。
# client_py_pipeline.py
import asyncio
import aiohttp
import timeSERVER_URL = "http://localhost:8000/v2/models/full_pipeline_py/infer"
FILE_PATH = "path/to/your/sample.txt" # 替换为您的示例文件路径
CONCURRENT_REQUESTS = 100 # 模拟100个并发请求async def send_request(session, file_bytes):payload = {"inputs":,"datatype": "BYTES","data": [file_bytes.decode('latin-1')] # 使用aiohttp时需要这样编码}]}async with session.post(SERVER_URL, json=payload) as response:if response.status == 200:result = await response.json()return resultelse:print(f"Error: {response.status}")return Noneasync def main():with open(FILE_PATH, "rb") as f:file_bytes = f.read()async with aiohttp.ClientSession() as session:tasks =start_time = time.time()results = await asyncio.gather(*tasks)end_time = time.time()print(f"Sent {CONCURRENT_REQUESTS} concurrent requests in {end_time - start_time:.2f} seconds.")success_count = len([r for r in results if r])print(f"Received {success_count} successful responses.")if __name__ == "__main__":asyncio.run(main())
方案二:混合后端实现流水线 (性能优化方案)
这种方法更先进,也更复杂。它将流水线拆分为两个部分,分别由最适合的后端处理:
- Python后端 (Orchestrator):负责处理文件IO、解析、分块这些灵活的自定义逻辑。
- ONNX后端 (Accelerator):负责执行计算密集型的Embedding任务,性能远超Python。
架构思路:客户端发送文件 -> Python模型接收、解析、分块 -> Python模型在服务器内部调用ONNX模型进行推理 -> Python模型聚合结果 -> 返回给客户端。
1. 转换模型为ONNX格式
这一步是前提。
pip install optimum[exporters]
optimum-cli export onnx --model BAAI/bge-base-en-v1.5 --task feature-extraction./embedding_onnx_model
2. 准备模型仓库 (包含两个模型)
-
创建目录结构:
# 为ONNX模型创建目录 mkdir -p model_repository/embedding_onnx/1 mv./embedding_onnx_model/model.onnx model_repository/embedding_onnx/1/# 为Python编排器模型创建目录 mkdir -p model_repository/pipeline_orchestrator/1
-
配置
embedding_onnx
模型:
这个模型是内部服务,负责高性能计算。config.pbtxt
forembedding_onnx
:# 保存为 model_repository/embedding_onnx/config.pbtxt name: "embedding_onnx" backend: "onnxruntime" max_batch_size: 256 # 可以处理非常大的批次(所有文件的所有块)input # 动态批处理, 动态序列长度},{name: "attention_mask"data_type: TYPE_INT64dims: [ -1, -1 ]} ]output} ]# --- 性能优化配置 --- # 同样创建大量实例来利用CPU核心 instance_group
-
配置
pipeline_orchestrator
模型:
这个模型是流水线的入口,负责编排。-
准备依赖:
# 保存为 model_repository/pipeline_orchestrator/requirements.txt transformers # 只需要分词器 # pypdf2 # 如果需要
-
model.py
forpipeline_orchestrator
(核心):# 保存为 model_repository/pipeline_orchestrator/1/model.py import json import numpy as np import triton_python_backend_utils as pb_utils from transformers import AutoTokenizerclass TritonPythonModel:def initialize(self, args):"""加载分词器等。"""self.tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-base-en-v1.5")print('Tokenizer loaded.')def _parse_and_chunk(self, file_bytes):# 这部分逻辑与方案一完全相同text = file_bytes.decode('utf-8')chunks = [chunk for chunk in text.split('\n\n') if chunk.strip()]return chunks if chunks else [""]def _mean_pooling(self, model_output, attention_mask):# ONNX模型输出后处理token_embeddings = model_outputinput_mask_expanded = np.expand_dims(attention_mask, -1).repeat(token_embeddings.shape[-1], -1)sum_embeddings = np.sum(token_embeddings * input_mask_expanded, 1)sum_mask = np.clip(input_mask_expanded.sum(1), a_min=1e-9, a_max=None)return sum_embeddings / sum_maskasync def execute(self, requests):"""异步执行,可以并行处理对内部ONNX模型的调用。"""responses =# 异步处理所有请求for request in requests:# 1. 解析和分块 (与方案一相同)in_file_bytes_tensor = pb_utils.get_input_tensor_by_name(request, "RAW_FILE")file_bytes_list = in_file_bytes_tensor.as_numpy()all_final_embeddings =for file_bytes in file_bytes_list:text_chunks = self._parse_and_chunk(file_bytes)# 2. 客户端分词encoded_input = self.tokenizer(text_chunks, padding=True, truncation=True, return_tensors='np')# 3. 构造对内部ONNX模型的推理请求infer_request = pb_utils.InferenceRequest(model_name='embedding_onnx',requested_output_names=['last_hidden_state'],inputs=),pb_utils.Tensor('attention_mask', encoded_input['attention_mask'])])# 4. 发送内部请求并等待响应 (异步执行)infer_response = await infer_request.async_exec()if infer_response.has_error():raise pb_utils.TritonModelException(infer_response.error().message())# 5. 从响应中提取结果并进行后处理last_hidden_state = pb_utils.get_output_tensor_by_name(infer_response, 'last_hidden_state').as_numpy()chunk_embeddings = self._mean_pooling(last_hidden_state, encoded_input['attention_mask'])# 6. 聚合结果final_embedding = np.mean(chunk_embeddings, axis=0)all_final_embeddings.append(final_embedding)# 7. 创建最终输出out_tensor = pb_utils.Tensor("FINAL_EMBEDDING", np.stack(all_final_embeddings).astype(np.float32))inference_response = pb_utils.InferenceResponse(output_tensors=[out_tensor])responses.append(inference_response)return responses
-
config.pbtxt
forpipeline_orchestrator
:# 保存为 model_repository/pipeline_orchestrator/config.pbtxt name: "pipeline_orchestrator" backend: "python" max_batch_size: 32input} ]output} ] # 注意:这个编排器模型通常不需要自己的instance_group, # 因为它的主要瓶颈在于它调用的ONNX模型。 # 并行性由ONNX模型的多个实例来保证。
-
3. 启动Triton服务器
命令完全相同,Triton会自动加载这两个模型。
docker run --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 \
-v $(pwd)/model_repository:/models \
nvcr.io/nvidia/tritonserver:<xx.yy>-py3 \
tritonserver --model-repository=/models
4. 编写并行客户端
好消息是,客户端代码与方案一完全相同! 所有的复杂性都被封装在了服务器端。客户端只需要调用pipeline_orchestrator
模型即可,它并不知道内部还有一个ONNX模型。
结论与最终建议
特性 | 方案一 (纯Python) | 方案二 (混合后端) |
---|---|---|
性能 | 中等 | 高 |
部署复杂度 | 低 | 高 |
代码维护 | 简单 (所有逻辑在一个文件) | 复杂 (逻辑分散在两个模型) |
灵活性 | 极高 | 极高 |
资源利用 | 好 | 最佳 (为不同任务使用最优后端) |
给您的建议:
-
从方案一开始:首先实现纯Python后端的流水线。它更容易调试,能让您快速验证整个流程的正确性。对于中等负载,通过增加
instance_group
的count
,它的性能可能已经足够满足您的需求。 -
当性能成为瓶颈时,再转向方案二:如果您的QPS(每秒查询率)要求非常高,或者单个请求的延迟需要被极致压缩,那么投入精力去实现混合后端方案是值得的。它将计算最密集的部分交给了最高效的ONNX后端,能最大化您32核CPU的吞吐能力。
方案二代表了Triton更高级的用法,是实现生产级高性能服务的理想架构。