大模型及agent开发5 OpenAI Assistant API 进阶应用
核心功能:外部工具的应用和流式功能
工具:
1.内置热门工具。由OpenAI团队实现,通过接口的方式直接提供给用户,方便快速集成。
2.构建自定义外部函数流程和开发接口,允许用户通过函数调用扩展自身工具的功能。
一. Assistant API 的 File Search
功能:文件搜索通过来自其模型之外的知识来增强助手,例如专有产品信息或用户提供的文档。
也就是RAG
RAG流程为:
索引(Indexing) 索引过程是离线执行的关键初始步骤。首先清理和提取原始数据,将 PDF、HTML 和 Word 等各种文件格式转换为标准化纯文本。为了适应语言模型的上下文约束,这些文本被分为更小且更易于管理的块,这个过程称为分块。然后使用嵌入模型将这些块转换为向量表示。最后,创建一个索引来将这些文本块及其向量嵌入存储为键值对,从而实现高效且可扩展的搜索功能。
检索(Retrieval) 用户查询用于从外部知识源检索相关上下文。为了实现这一点,用户查询由编码模型处理,该模型生成语义相关的嵌入。然后,对向量数据库进行相似性搜索,以检索前k个最接近的数据对象。
生成(Generation) 将用户查询和检索到的附加上下文填充到提示模板中。最后,将检索步骤中的增强提示输入到LLM中。
File Search 优势:
可以自动解析和分块上传的文档,创建和存储嵌入,并使用向量和关键字搜索来检索相关内容以回答用户查询。
具体优化点:
重写用户查询以优化搜索。
将复杂的用户查询分解为可以并行运行的多个搜索。
在助手和线程向量存储中运行关键字和语义搜索。
在生成最终响应之前对搜索结果重新排序以选择最相关的结果。
如果接入RAG流程, Indexing 过程通常是要最先处理的,也就是要先将私有数据全部存放到向量数据库中。
在构造Indexing时,我们只需先将私有数据文件上传到 OpenAI 的云服务器,然后按照 Assistant API 提供的接口将这些数据存储到向量数据库中,即可非常便捷和高效的完成复杂的私有知识库处理流程。
eg:实现私有知识库问答场景。
Stage 1. 上传本地文件至OpenAI服务器
方法:files.create(),需要传递的参数如下:
参数名 类型 可选性 默认值 描述
file file 必需 - 要上传的文件对象(不是文件名)。
purpose string 必需 - 上传文件的预期用途。使用 "assistants" 表示助手和消息文件,"vision" 表示助手的图像文件输入,"batch" 表示批量 API,"fine-tune" 表示微调
from openai import OpenAI
client = OpenAI()
new_file = client.files.create(
file=open("./data/01_LLMs/01_大模型应用发展及Agent前沿技术趋势.pdf", "rb"),
purpose="assistants"
)
每个文件都会生成一个唯一的file id,用于在其他流程中管理和标识各个独立的文件对象。而要上传多个文件,一种最简单的方式就是使用一个循环来依次上传每个文件。代码如下所示:
# 准备上传文件
file_paths = [
"./data/01_LLMs/AI Agent开发入门.pdf",
"./data/01_LLMs/ChatGLM3-6B零基础部署与使用指南.pdf",
"./data/01_LLMs/ChatGLM3模型介绍.pdf"
]
# 遍历文件路径并上传文件
uploaded_files = []
for path in file_paths:
with open(path, "rb") as file:
new_file = client.files.create(
file=file,
purpose="assistants"
)
uploaded_files.append(new_file)
# 打印上传结果
for uploaded_file in uploaded_files:
print(uploaded_file)
文件查询:
使用files.list()方法能够查看
all_file = client.files.list()
文件检索:
files.retrieve()方法
file_info = client.files.retrieve(
file_id=new_file.id
)
file_info.to_dict()
文件删除:
client.files.delete("file-FbNTHBNAjifCFaMD7PZbOLmE")
Stage 2. 将自定义文件存储至向量数据库
向量数据库原理:
给定一个查询向量,然后在众多向量中找到最为相似的一些向量返回。这就不再是精确匹配,而是具有一定的模糊性,这就是所谓的最近邻(Nearest Neighbors)问题,而能实现这一点的则称之为【最近邻(搜索)算法】。
向量数据库的作用是可以使File Search工具能够搜索文件,因此步骤为:其一是创建一个向量数据库,其二是将文件添加到刚创建的这个向量数据库中。
方法:.beta.vector_stores.create()方法
其参数如下所示:
参数名 类型 可选性 默认值 描述
file_ids array 可选 - 向量存储应使用的文件 ID 列表。对像 file_search 这样的工具非常有用,可以访问文件。
name string 可选 - 向量存储的名称。
expires_after object 可选 - 向量存储的过期策略。
├── anchor string 必需 - 过期策略适用的锚时间戳。支持的锚点:last_active_at。
└── days integer 必需 - 从锚时间开始,向量存储将在多少天后过期。
chunking_strategy object 可选 - 用于对文件进行分块的策略。如果未设置,将使用自动策略。仅在 file_ids 非空时适用。
├── type string 必需 - 始终为 auto(自动)或 static(静态)。
└── static object 必需 -
├── max_chunk_size_tokens integer 必需 800 每个块的最大令牌数。最小值为 100,最大值为 4096。
└── chunk_overlap_tokens integer 必需 400 块之间重叠的令牌数。重叠不得超过 max_chunk_size_tokens 的一半。
metadata map 可选 - 可附加到对象的 16 个键值对集合,便于以结构化格式存储附加信息。键最多 64 个字符,值最多 512 个字符。
创建向量数据库:
将文件添加向量数据库时会自动解析、分块、嵌入文件并将其存储,所以默认的chunking_strategy字段中的type参数是auto,max_chunk_size_tokens是800, chunk_overlap_tokens是400。除此之外,每个向量数据库最多可容纳 10,000 个文件,同时对于收费情况:有 1 GB 的免费额度。超出后,使用费用为 0.10 美元/GB/天。如果采用默认的分块策略,代码如下所示:
vector_store_1 = client.beta.vector_stores.create(
name="llms_vector_store",
file_ids=['file-qd9jnQQE7v4qJ48fQ8dhgoy0', 'file-xzF4PlFUcFjYUP5jigQv1s4r','file-OdySxtvdUFHhEM7zrDNzxrwS']
)
除了默认切分策略外,还可以根据数据情况自定义,那么需要修改的参数是将chunking_strategy参数中type参数改为static,并且指定max_chunk_size_tokens和chunk_overlap_tokens参数的数值,代码如下所示:
vector_store_2 = client.beta.vector_stores.create(
name="llms_vector_store_2",
file_ids=['file-2zEAXzz8NSry24ZthHgfnCeF'],
chunking_strategy={
"type": "static", # 明确使用 static 切分策略
"static": { # 在 static 键下提供具体的切分参数
"max_chunk_size_tokens": 1000, # 自定义的最大切分大小
"chunk_overlap_tokens": 500 # 自定义的重叠大小,注意:Chunk_overlap_tokens必须小于或等于max_chunk_size_tokens / 2
}
}
)
从结果上看,对于每个文件存储至向量数据库时所应用的切分策略,直接在vector_store对象中是看不到的,这是因为切分策略是和具体的文件做绑定的,而一个向量数据库中可以存储多个文件。每个文件都可以有自己在存储时定义的切分策略,所以这个切分策略和向量数据库对象没有任何关系。在构建向量数据库的时候,.beta.vector_stores.create的本质是创建一个新的数据库,我们可以在创建的时候直接通过file_ids参数传入多个文件。但是当该向量数据库已经存在的时候,如果想基于这个向量数据库进行文件的增删操作,则需要借助.beta.vector_stores.files()方法。
查询向量数据库:
可以使用.beta.vector_stores.list()方法查看所有已创建的向量数据库列表,代码如下所示:
vector_stores = client.beta.vector_stores.list()
向已存在向量数据库追加文件:
使用的方法是.beta.vector_stores.files.create(),需要传入的参数是 向量数据库的 id 及 新增加文件的 id ,代码如下所示:(注意:目前仅支持单次最多增加一个文件)
vector_store_file = client.beta.vector_stores.files.create(
vector_store_id="vs_Q2YAwM7TLVJ9yymhiOF7sKws", # vector_store_2 对象的id
file_id="file-9hkFbPwsbnzltkh6oLdn7pjg" # 追加一个新的文件:ChatGLM3-6B零基础部署与使用指南.pdf
)
vector_store_file.to_dict()
而如果想单次同时增加多个文件,则需要用到Assistant API的批处理结果,即.beta.vector_stores.file_batches.create(),在这个接口下,可以在file_ids字段中以列表的形式传递多个file id。代码如下所示:
for vector_id in vector_stores.data:
print(f"vector_id:{vector_id.id} - {vector_id.name}")
for file_id in all_file.data:
print(f"file_id:{file_id.id} - {file_id.filename}")
vector_store_file_batch = client.beta.vector_stores.file_batches.create(
vector_store_id="vs_Q2YAwM7TLVJ9yymhiOF7sKws",
file_ids=["file-OdySxtvdUFHhEM7zrDNzxrwS", "file-cct6euj7OEk4WpMpasZUZ0LC"]
)
vector_store_file_batch.to_dict()
在指定向量数据库中检索指定文件的详细信息:
.beta.vector_stores.files.retrieve()
想查看某个向量数据中的全部文件存储信息,则需要使用批处理接口.beta.vector_stores.file_batches.list_files()。代码如下:
vector_store_files = client.beta.vector_stores.file_batches.list_files(
vector_store_id="vs_Q2YAwM7TLVJ9yymhiOF7sKws",
batch_id="vsfb_da8915144e6144e582d3618dc5756265"
)
vector_store_files.to_dict()
在已存在向量数据库中删除指定文件:
有两种方法:
选定某个向量数据库,在其内部删除
通过删除底层文件对象(这会导致所有使用该文件的服务或工具,如vector_store、File Search等配置中删除该文件
如果想在某个已知的向量数据库中删除具体的某个文件,可以通过.beta.vector_stores.files.delete()方法来实现。代码如下所示:(注意:目前仅支持单次最多删除一个文件)
from openai import OpenAI
client = OpenAI()
deleted_vector_store_file = client.beta.vector_stores.files.delete(
vector_store_id="vs_bx5MFra1VDKIO8Ln3Wt5QFGa",
file_id="file-en7QEv1fQYEOWKnui5Cli0Mu"
)
deleted_vector_store_file
检索向量数据库:
掌握了向量数据库内文件的相关操作后,向量数据库对象也可以分别通过.beta.vector_stores.retrieve() 和 .beta.vector_stores.update() 检索和更新向量数据库信息,比如想选定某个向量数据库的时候,代码如下:
vector_store = client.beta.vector_stores.retrieve(
vector_store_id="vs_dab0nsKPPlIRuM45yfd70wg0"
)
vector_store.to_dict()
更新向量数据库的名称,代码如下:
vector_store = client.beta.vector_stores.update(
vector_store_id="vs_ztjSGgwqPnWxJsKcyJUnVmK1",
name="test_kb_110"
)
vector_store.to_dict()
删除向量数据库:
deleted_vector_store = client.beta.vector_stores.delete(
vector_store_id="vs_ztjSGgwqPnWxJsKcyJUnVmK1"
)
deleted_vector_store.to_dict()
Stage 3. 创建 Assistant 对象
如果想让Assistant对象进一步具备工具的调用能力的高阶能力,则需要借助如下参数:
参数名 类型 可选性 默认值 描述
tools array 可选 [] 启用在助手上的工具列表。每个助手最多可以有 128 个工具。工具类型包括:code_interpreter、file_search 和 function。
├── Code interpreter tool object 可选 - 代码解释器工具。
├── FileSearch tool object 可选 - 文件搜索工具。
└── Function tool object 可选 - 功能工具。
tool_resources object or null 可选 - 助手工具使用的资源集合。资源与工具类型相关。例如,代码解释器工具需要文件 ID 列表,而文件搜索工具需要向量存储 ID 列表。
├── code_interpreter object 可选 - 代码解释器相关资源。
└── file_search object 可选 - 文件搜索相关资源。
使用tools参数可以让 Assistant 最多访问 128 个工具。这包含OpenAI 内置的工具Code_interpreter和File_Search ,以及通过Function calling规范调用的第三方自定义工具。基于这种设定,这里我们创建一个启用文件搜索的Assistant(助手)对象,代码如下:
assistant = client.beta.assistants.create(
name="Large language model technical assistant", # 大语言模型技术助理
instructions="You are a professional large model technician and apply your basic knowledge to answer large model related questions", # 你是一位专业的大模型技术人员,运用你的基础知识来回答大模型相关的问题
model="gpt-4o-mini-2024-07-18",
tools=[
{"type": "file_search"}
],
)
Stage 4. 创建 Thread 对象
使用外部工具的相关参数:
参数名 类型 可选性 默认值 描述
tool_resources object or null 可选 - 在此线程中为助手工具提供的一组资源。这些资源与工具类型相关。
├── code_interpreter object 可选 - 代码解释器相关资源。
│ └── file_ids array 可选 [] 提供给代码解释器工具的文件 ID 列表。最多可关联 20 个文件。
└── file_search object 可选 - 文件搜索相关资源。
├── vector_store_ids array 可选 - 附加到此线程的向量存储 ID 列表。最多可关联 1 个向量存储。
└── vector_stores array 可选 - 用于创建带有 file_ids 的向量存储并附加到此线程的助手。最多可关联 1 个向量存储。
└── file_ids array 可选 - 要添加到向量存储的文件 ID 列表。最多可包含 10000 个文件。
└── chunking_strategy object 可选 - 用于对文件进行分块的策略。如果未设置,将使用自动策略。
└── metadata map 可选 - 可附加到向量存储的一组最多 16 个键值对,有助于以结构化格式存储附加信息。键最长 64 个字符,值最长 512 个字符。
Assistant对象与Thread对象都可以指定外部工具,那建立在哪里好?
不要两个里面都建立,稳定性不高
二.代码解释器
作用:允许Assistant对象在沙盒执行环境中编写和运行 Python 代码
该工具可以处理具有不同数据和格式的文件,并生成具有数据和图形图像的文件。主要用于解决具有挑战性的代码和数学问题。同时,当 Assistant对象编写的代码无法运行时,它可以通过尝试运行不同的代码来迭代此代码,直到代码执行成功。通常应用于需要进行数学分析或绘制图表的表格数据,即一些数据分析需求场景。
具体步骤如下:
from openai import OpenAI
client = OpenAI()
Stage 1. 启用代码解释器
在 Assistant 对象的tools参数中传递code_interpreter以启用代码解释器。
assistant = client.beta.assistants.create(
name="Data Engineer", # 数据分析师
instructions="You're a weather data analyst. When asked for data information, write and run code to answer the question", # 你是一名气象数据分析师。当被问及数据信息时,编写并运行代码来回答问题 model="gpt-4o-mini-2024-07-18",
tools=[
{"type": "code_interpreter"}
],
model="gpt-4o-mini-2024-07-18",
)
如果想增强其使用的概率,可以通过在构建Assistant对象时在instructions中的提示来促进,例如明确说明“编写代码来解决这个问题”。
Stage 2. 创建线程
thread = client.beta.threads.create()
Stage 3.将文件传递给代码解释器
file = client.files.create(
file=open("./data/weather_data_complex.csv", "rb"),
purpose='assistants'
)
这里更新一下Assistant对象实例,传递需要执行分析的.csv文件。代码如下:
# 创建一个使用助手的ID
assistant = client.beta.assistants.update(
assistant_id=assistant.id,
name="Data Engineer", # 数据分析师
instructions="You're a weather data analyst. When asked for data information, write and run code to answer the question", # 你是一名气象数据分析师。当被问及数据信息时,编写并运行代码来回答问题
model="gpt-4o",
tools=[{"type": "code_interpreter"}],
tool_resources={
"code_interpreter": {
"file_ids": [file.id]
}
}
)
这里使用刚才创建的Thread对象实例,避免重复付费。
thread_message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="请帮我分析各个城市的实时天气数据,并绘制折线图",
)
print(thread_message)
Stage 4. 创建 Run 运行状态
run=client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
)
Run开始运行后,同样还是进入queued状态,需要一段时间才能完成。这里我们等待几秒钟再获取其最终的结果。
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
last_message = messages.data[0]
得到结果:
last_message.to_dict()
{'id': 'msg_m4Zke5wDLKVsB5eh54xSZ8Ts',
'assistant_id': 'asst_S5dB3bbWVIj3b2PraFk431PQ',
'attachments': [],
'content': [{'image_file': {'file_id': 'file-eBclJ2mOnNygdwe3OZkGNLkQ'},
'type': 'image_file'},
{'text': {'annotations': [],
'value': '以上图像显示了各城市的平均温度、湿度和风速:\n\n- **左图**:展示了各城市的平均温度(单位:摄氏度)。可以看到一些城市的温度变化较大。\n- **中图**:展示了各城市的平均湿度(单位:%)。湿度也是各个城市之间差异明显。\n- **右图**:展示了各城市的平均风速(单位:米/秒)。某些城市的风速有明显的变化。\n\n为了更深入地分析天气状况分布,我们还将绘制各个城市的天气情况的分布直方图。'},
'type': 'text'}],
'created_at': 1727417373,
'metadata': {},
'object': 'thread.message',
'role': 'assistant',
'run_id': 'run_RI9oLWYEpdmKEoiQSxi4H8yT',
'thread_id': 'thread_EKpSbWT5fra7zpm3ZhRxSHOH'}
Stage 5. 读取代码解释器生成的图像和文件
当代码解释器生成图像时,我们可以在最终消息响应的file_id字段中查找并下载该文件:
file_id = last_message.content[0].image_file.file_id
file_id
将图像文件下载至本地环境中,代码如下:
image_data = client.files.content(file_id=file_id)
image_data_bytes = image_data.read()
with open("./my-image.png", "wb") as file:
file.write(image_data_bytes)
同时也可以做一个生成.csv文件的场景,如下:
thread_message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="根据平均温度对城市进行排序,并生成一个新的.csv文件供我下载",
attachments=[
{"file_id": file.id,
"tools": [
{"type": "code_interpreter"}
]
}
]
)
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
)
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
last_message = messages.data[0]
text = last_message.content[0].text.value
print(text)
得到结果:
我已经根据平均温度对城市进行了排序,并生成了一个新的 CSV 文件。您可以通过以下链接下载该文件:
[下载排序后的城市温度数据](sandbox:/mnt/data/sorted_cities_by_temperature.csv)
如果您还有其他需要帮忙的地方,请告诉我!
三. Assistant API 的 Function Calling
使用方式:在Assistant对象中添加描述
场景:用户能够动态执行Python 代码和 SQL 查询,结合这两个功能,用户可以在一个统一的平台上同时进行数据处理和查询
Stage 1. 定义外部函数
python_inter 函数,用户可以输入任意 Python 代码并在指定的全局环境中执行,获取执行结果或新变量。
sql_inter 函数与 MySQL 数据库交互,执行 SQL 查询并获取结果。
import json
def python_inter(py_code, g=None):
"""
专门用于执行python代码,并获取最终查询或处理结果。
:param py_code: 字符串形式的Python代码,
:param g: g,字典形式变量,表示环境变量,若未提供则创建一个新的字典
:return:代码运行的最终结果
"""
# 使用空字典作为默认全局环境
global_vars = {}
try:
# 尝试如果是表达式,则返回表达式运行结果
return str(eval(py_code, global_vars))
except Exception as e:
# 记录执行前的全局变量
global_vars_before = set(global_vars.keys())
try:
exec(py_code, global_vars)
except Exception as e:
return f"代码执行时报错: {e}"
# 记录执行后的全局变量,确定新变量
global_vars_after = set(global_vars.keys())
new_vars = global_vars_after - global_vars_before
# 若存在新变量
if new_vars:
result = {var: global_vars[var] for var in new_vars}
return str(result)
else:
return "已经顺利执行代码"
def sql_inter(sql_query, host, user, password, database, port):
"""
用于执行一段SQL代码,并最终获取SQL代码执行结果,
核心功能是将输入的SQL代码传输至MySQL环境中进行运行,
并最终返回SQL代码运行结果。需要注意的是,本函数是借助pymysql来连接MySQL数据库。
:param sql_query: 字符串形式的SQL查询语句,用于执行对MySQL中telco_db数据库中各张表进行查询,并获得各表中的各类相关信息
:param host: MySQL服务器的主机名
:param user: MySQL服务器的用户名
:param password: MySQL服务器的密码
:param database: MySQL服务器的数据库名
:param port: MySQL服务器的端口
:param g: g,字符串形式变量,表示环境变量,无需设置,保持默认参数即可
:return:sql_query在MySQL中的运行结果。
"""
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4"
# 创建数据库引擎
engine = create_engine(SQLALCHEMY_DATABASE_URI)
# 创建会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
db_session = SessionLocal()
try:
from sqlalchemy import text
# 执行SQL查询
result = db_session.execute(text(sql_query))
results = result.fetchall()
# # 将结果转换为字典列表
keys = result.keys()
results_list = [dict(zip(keys, row)) for row in results]
from decimal import Decimal
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, datetime):
return obj.isoformat() # 将 datetime 对象转为 ISO 8601 格式的字符串
elif isinstance(obj, Decimal):
return float(obj) # 将 Decimal 转为 float
raise TypeError("Type not serializable")
# 返回 JSON 格式的查询结果
return json.dumps(results_list, default=json_serial)
finally:
db_session.close() # 确保关闭会话
# 返回 JSON 格式的查询结果
return "数据库查询出错"
Stage 2. 定义外部函数的JSON Schema表示
}
},
"required": ["py_code"]
}
}
}
sql_tool_desc = {
"type": "function",
"function": {
"name": "sql_inter",
"description": "Executes a SQL query on a MySQL database using pymysql and returns the results.",
"parameters": {
"type": "object",
"properties": {
"sql_query": {
"type": "string",
"description": "The SQL query to be executed against the MySQL database."
},
"host": {
"type": "string",
"description": "The hostname of the MySQL server."
},
"user": {
"type": "string",
"description": "The username for the MySQL server."
},
"password": {
"type": "string",
"description": "The password for the MySQL server."
},
"database": {
"type": "string",
"description": "The name of the database to connect to on the MySQL server."
},
"port": {
"type": "integer",
"description": "The port number on which the MySQL server is running."
}
},
"required": ["sql_query", "host", "user", "password", "database", "port"]
},
}
}
available_functions = {
"python_inter": python_inter,
"sql_inter": sql_inter
}
Stage 3. 定义Assistant对象
准备好了外部函数和其对应的Json Schema描述,我们只需要在创建Assistant对象时,在Assistant对象的的tools参数下添加定义。代码如下:
from openai import OpenAI
client = OpenAI()
assistant = client.beta.assistants.create(
name="Data Engineer", # 数据分析师
instructions="You're a senior data analyst. When asked for data information, write and run Python code to answer the question,\
The mysql database information you can connect to is: username = 'root', database_name = 'mategen', password = 'snowball950123', hostname = '192.168.110.131'", # 你是一位高级数据分析师。当被要求提供数据信息时,编写并运行Python代码来回答这个问题
model = "gpt-4o-mini-2024-07-18",
tools = [
python_tool_desc,
sql_tool_desc
]
)
Stage 4. 创建Thread并添加消息
thread = client.beta.threads.create()
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="请问 100 * 200 - 30 等于多少。",
)
Stage 5. 开始运行
run = client.beta.threads.runs.create_and_poll(
thread_id=thread.id,
assistant_id=assistant.id,
)
获取结果
run.to_dict()
当用户的提问触发函数调用功能时,Run会进入requires_action状态,表示需要调用函数,代码如下:
这里我们定义function_to_call函数实现打印函数输出的过程
def function_to_call(run, available_functions):
tool_outputs = []
tool_call = run.required_action.submit_tool_outputs.tool_calls[0]
arguments = json.loads(tool_call.function.arguments)
tool_outputs.append({
"tool_call_id": tool_call.id,
"output": available_functions[tool_call.function.name](**arguments)
})
return tool_outputs
tool_outputs = function_to_call(run, available_functions)
tool_outputs
接下来要做的事情是:把工具的执行结果,回传给 Run。 这里就需要借助client.beta.threads.runs.submit_tool_outputs()方法,它支持在工具调用全部完成后提交输出。
if tool_outputs:
try:
run = client.beta.threads.runs.submit_tool_outputs_and_poll(
thread_id=thread.id,
run_id=run.id,
tool_outputs=tool_outputs
)
print("Tool outputs submitted successfully.")
except Exception as e:
print("Failed to submit tool outputs:", e)
else:
print("No tool outputs to submit.")
提交完成后,我们只需要耐心的等待Run的继续执行即可。
if run.status == 'completed':
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
print(messages.data[0].content[0].text.value)
else:
print(run.status)
100 * 200 - 30 等于 19970。
最后,还可以测试更复杂的应用需求,这里我们的提问如下所示:
assistant = client.beta.assistants.create(
name="Data Engineer", # 数据分析师
instructions="You're a senior data analyst. When asked for data information, write and run Python code to answer the question,\
The mysql database information you can connect to is: username = 'root', database_name = 'mategen', password = 'snowball950123', hostname = '192.168.110.131'", # 你是一位高级数据分析师。当被要求提供数据信息时,编写并运行Python代码来回答这个问题
model = "gpt-4o-mini-2024-07-18",
tools = [
python_tool_desc,
sql_tool_desc
]
)
thread = client.beta.threads.create()
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="帮我查询 employees 表中的内容,计算所有的人的工资总和,然后与 9 * 9 的结果进行相加,返回最终的结果。",
)
run = client.beta.threads.runs.create_and_poll(
thread_id=thread.id,
assistant_id=assistant.id,
)
if run.status == 'completed':
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
print(messages)
else:
print(run.status)
这里注意:并行函数调用场景并不是百分之百触发,大家再自己实践时,可通过尝试不同的提问和需求来进行场景复现。
run.to_dict()
如果在required_action中看到两个tool_calls ,表明用户触发了并行函数调用。
def function_to_call(run, available_functions):
tool_outputs = []
for tool_call in run.required_action.submit_tool_outputs.tool_calls:
function_name = tool_call.function.name
function_args = tool_call.function.arguments
# 处理多次子调用的情况
if function_name == 'multi_tool_use.parallel':
tool_uses = json.loads(function_args).get('tool_uses', [])
for tool_use in tool_uses:
recipient_name = tool_use.get('recipient_name').split('.')[-1]
parameters = tool_use.get('parameters')
tool_outputs.append({
"tool_call_id": tool_call.id,
"output": available_functions[recipient_name](**parameters)
})
# 处理单个外部函数调用的情况
else:
arguments = json.loads(tool_call.function.arguments)
tool_outputs.append({
"tool_call_id": tool_call.id,
"output": available_functions[tool_call.function.name](**arguments)
})
return tool_outputs
tool_outputs = function_to_call(run, available_functions)
tool_outputs
if tool_outputs:
try:
run = client.beta.threads.runs.submit_tool_outputs_and_poll(
thread_id=thread.id,
run_id=run.id,
tool_outputs=tool_outputs
)
print("Tool outputs submitted successfully.")
except Exception as e:
print("Failed to submit tool outputs:", e)
else:
print("No tool outputs to submit.")
if run.status == 'completed':
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
print(messages.data[0].content[0].text.value)
else:
print(run.status)
run.to_dict()
结果:根据查询,`employees` 表中所有人的工资总和为 **225000.0**。将此总和与 \(9 \times 9\) 的结果相加,得到的最终结果为 **225081.0**。
run_steps = client.beta.threads.runs.steps.list(
thread_id=thread.id,
run_id=run.id
)
run_steps.to_dict()
下一篇介绍
另外一个重要的知识点:流式功能。同时会通过实战案例展示如何高效结合代码解释器、文件搜索以及自定义函数,以实现更复杂、更智能的智能体流程。