当前位置: 首页 > news >正文

大模型及agent开发5 OpenAI Assistant API 进阶应用

核心功能:外部工具的应用和流式功能
工具:
1.内置热门工具。由OpenAI团队实现,通过接口的方式直接提供给用户,方便快速集成。
2.构建自定义外部函数流程和开发接口,允许用户通过函数调用扩展自身工具的功能。

一. Assistant API 的 File Search
功能:文件搜索通过来自其模型之外的知识来增强助手,例如专有产品信息或用户提供的文档。
也就是RAG
RAG流程为:
索引(Indexing) 索引过程是离线执行的关键初始步骤。首先清理和提取原始数据,将 PDF、HTML 和 Word 等各种文件格式转换为标准化纯文本。为了适应语言模型的上下文约束,这些文本被分为更小且更易于管理的块,这个过程称为分块。然后使用嵌入模型将这些块转换为向量表示。最后,创建一个索引来将这些文本块及其向量嵌入存储为键值对,从而实现高效且可扩展的搜索功能。
检索(Retrieval) 用户查询用于从外部知识源检索相关上下文。为了实现这一点,用户查询由编码模型处理,该模型生成语义相关的嵌入。然后,对向量数据库进行相似性搜索,以检索前k个最接近的数据对象。
生成(Generation) 将用户查询和检索到的附加上下文填充到提示模板中。最后,将检索步骤中的增强提示输入到LLM中。

File Search 优势:
可以自动解析和分块上传的文档,创建和存储嵌入,并使用向量和关键字搜索来检索相关内容以回答用户查询。
具体优化点:
重写用户查询以优化搜索。
将复杂的用户查询分解为可以并行运行的多个搜索。
在助手和线程向量存储中运行关键字和语义搜索。
在生成最终响应之前对搜索结果重新排序以选择最相关的结果。
如果接入RAG流程, Indexing 过程通常是要最先处理的,也就是要先将私有数据全部存放到向量数据库中。
在构造Indexing时,我们只需先将私有数据文件上传到 OpenAI 的云服务器,然后按照 Assistant API 提供的接口将这些数据存储到向量数据库中,即可非常便捷和高效的完成复杂的私有知识库处理流程。

eg:实现私有知识库问答场景。
Stage 1. 上传本地文件至OpenAI服务器
方法:files.create(),需要传递的参数如下:
参数名    类型    可选性    默认值    描述
file    file    必需    -    要上传的文件对象(不是文件名)。
purpose    string    必需    -    上传文件的预期用途。使用 "assistants" 表示助手和消息文件,"vision" 表示助手的图像文件输入,"batch" 表示批量 API,"fine-tune" 表示微调

from openai import OpenAI
client = OpenAI()

new_file = client.files.create(
  file=open("./data/01_LLMs/01_大模型应用发展及Agent前沿技术趋势.pdf", "rb"),
  purpose="assistants"
)

  每个文件都会生成一个唯一的file id,用于在其他流程中管理和标识各个独立的文件对象。而要上传多个文件,一种最简单的方式就是使用一个循环来依次上传每个文件。代码如下所示:

# 准备上传文件
file_paths = [
    "./data/01_LLMs/AI Agent开发入门.pdf",
    "./data/01_LLMs/ChatGLM3-6B零基础部署与使用指南.pdf",
    "./data/01_LLMs/ChatGLM3模型介绍.pdf"
]

# 遍历文件路径并上传文件
uploaded_files = []
for path in file_paths:
    with open(path, "rb") as file:
        new_file = client.files.create(
            file=file,
            purpose="assistants"
        )
        uploaded_files.append(new_file)

# 打印上传结果
for uploaded_file in uploaded_files:
    print(uploaded_file)


文件查询:
使用files.list()方法能够查看
all_file = client.files.list()

文件检索:

files.retrieve()方法
file_info = client.files.retrieve(
    file_id=new_file.id
)
file_info.to_dict()

文件删除:
client.files.delete("file-FbNTHBNAjifCFaMD7PZbOLmE")

Stage 2. 将自定义文件存储至向量数据库
向量数据库原理:
给定一个查询向量,然后在众多向量中找到最为相似的一些向量返回。这就不再是精确匹配,而是具有一定的模糊性,这就是所谓的最近邻(Nearest Neighbors)问题,而能实现这一点的则称之为【最近邻(搜索)算法】。
向量数据库的作用是可以使File Search工具能够搜索文件,因此步骤为:其一是创建一个向量数据库,其二是将文件添加到刚创建的这个向量数据库中。
方法:.beta.vector_stores.create()方法
其参数如下所示:
参数名    类型    可选性    默认值    描述
file_ids    array    可选    -    向量存储应使用的文件 ID 列表。对像 file_search 这样的工具非常有用,可以访问文件。
name    string    可选    -    向量存储的名称。
expires_after    object    可选    -    向量存储的过期策略。
├── anchor    string    必需    -    过期策略适用的锚时间戳。支持的锚点:last_active_at。
└── days    integer    必需    -    从锚时间开始,向量存储将在多少天后过期。
chunking_strategy    object    可选    -    用于对文件进行分块的策略。如果未设置,将使用自动策略。仅在 file_ids 非空时适用。
├── type    string    必需    -    始终为 auto(自动)或 static(静态)。
└── static    object    必需    -    
├── max_chunk_size_tokens    integer    必需    800    每个块的最大令牌数。最小值为 100,最大值为 4096。
└── chunk_overlap_tokens    integer    必需    400    块之间重叠的令牌数。重叠不得超过 max_chunk_size_tokens 的一半。
metadata    map    可选    -    可附加到对象的 16 个键值对集合,便于以结构化格式存储附加信息。键最多 64 个字符,值最多 512 个字符。


创建向量数据库:
将文件添加向量数据库时会自动解析、分块、嵌入文件并将其存储,所以默认的chunking_strategy字段中的type参数是auto,max_chunk_size_tokens是800, chunk_overlap_tokens是400。除此之外,每个向量数据库最多可容纳 10,000 个文件,同时对于收费情况:有 1 GB 的免费额度。超出后,使用费用为 0.10 美元/GB/天。如果采用默认的分块策略,代码如下所示:
vector_store_1 = client.beta.vector_stores.create(
  name="llms_vector_store",
  file_ids=['file-qd9jnQQE7v4qJ48fQ8dhgoy0', 'file-xzF4PlFUcFjYUP5jigQv1s4r','file-OdySxtvdUFHhEM7zrDNzxrwS']
)

除了默认切分策略外,还可以根据数据情况自定义,那么需要修改的参数是将chunking_strategy参数中type参数改为static,并且指定max_chunk_size_tokens和chunk_overlap_tokens参数的数值,代码如下所示:
vector_store_2 = client.beta.vector_stores.create(
    name="llms_vector_store_2",  
    file_ids=['file-2zEAXzz8NSry24ZthHgfnCeF'],
    chunking_strategy={
        "type": "static",  # 明确使用 static 切分策略
        "static": {  # 在 static 键下提供具体的切分参数
            "max_chunk_size_tokens": 1000,  # 自定义的最大切分大小
            "chunk_overlap_tokens": 500  # 自定义的重叠大小,注意:Chunk_overlap_tokens必须小于或等于max_chunk_size_tokens / 2
        }
    }
)

从结果上看,对于每个文件存储至向量数据库时所应用的切分策略,直接在vector_store对象中是看不到的,这是因为切分策略是和具体的文件做绑定的,而一个向量数据库中可以存储多个文件。每个文件都可以有自己在存储时定义的切分策略,所以这个切分策略和向量数据库对象没有任何关系。在构建向量数据库的时候,.beta.vector_stores.create的本质是创建一个新的数据库,我们可以在创建的时候直接通过file_ids参数传入多个文件。但是当该向量数据库已经存在的时候,如果想基于这个向量数据库进行文件的增删操作,则需要借助.beta.vector_stores.files()方法。


查询向量数据库:
可以使用.beta.vector_stores.list()方法查看所有已创建的向量数据库列表,代码如下所示:
vector_stores = client.beta.vector_stores.list()


向已存在向量数据库追加文件:
使用的方法是.beta.vector_stores.files.create(),需要传入的参数是 向量数据库的 id 及 新增加文件的 id ,代码如下所示:(注意:目前仅支持单次最多增加一个文件)
vector_store_file = client.beta.vector_stores.files.create(
  vector_store_id="vs_Q2YAwM7TLVJ9yymhiOF7sKws",  # vector_store_2 对象的id
  file_id="file-9hkFbPwsbnzltkh6oLdn7pjg"   # 追加一个新的文件:ChatGLM3-6B零基础部署与使用指南.pdf
)
vector_store_file.to_dict()

而如果想单次同时增加多个文件,则需要用到Assistant API的批处理结果,即.beta.vector_stores.file_batches.create(),在这个接口下,可以在file_ids字段中以列表的形式传递多个file id。代码如下所示:
for vector_id in vector_stores.data:
    print(f"vector_id:{vector_id.id} - {vector_id.name}")
for file_id in all_file.data:
    print(f"file_id:{file_id.id} - {file_id.filename}")


vector_store_file_batch = client.beta.vector_stores.file_batches.create(
  vector_store_id="vs_Q2YAwM7TLVJ9yymhiOF7sKws",
  file_ids=["file-OdySxtvdUFHhEM7zrDNzxrwS", "file-cct6euj7OEk4WpMpasZUZ0LC"]
)

vector_store_file_batch.to_dict()


在指定向量数据库中检索指定文件的详细信息:
.beta.vector_stores.files.retrieve()

想查看某个向量数据中的全部文件存储信息,则需要使用批处理接口.beta.vector_stores.file_batches.list_files()。代码如下:
vector_store_files = client.beta.vector_stores.file_batches.list_files(
  vector_store_id="vs_Q2YAwM7TLVJ9yymhiOF7sKws",
  batch_id="vsfb_da8915144e6144e582d3618dc5756265"
)

vector_store_files.to_dict()

在已存在向量数据库中删除指定文件:
有两种方法:
选定某个向量数据库,在其内部删除
通过删除底层文件对象(这会导致所有使用该文件的服务或工具,如vector_store、File Search等配置中删除该文件
如果想在某个已知的向量数据库中删除具体的某个文件,可以通过.beta.vector_stores.files.delete()方法来实现。代码如下所示:(注意:目前仅支持单次最多删除一个文件)
from openai import OpenAI
client = OpenAI()

deleted_vector_store_file = client.beta.vector_stores.files.delete(
    vector_store_id="vs_bx5MFra1VDKIO8Ln3Wt5QFGa",
    file_id="file-en7QEv1fQYEOWKnui5Cli0Mu"
)


deleted_vector_store_file

检索向量数据库:
掌握了向量数据库内文件的相关操作后,向量数据库对象也可以分别通过.beta.vector_stores.retrieve() 和 .beta.vector_stores.update() 检索和更新向量数据库信息,比如想选定某个向量数据库的时候,代码如下:
vector_store = client.beta.vector_stores.retrieve(
  vector_store_id="vs_dab0nsKPPlIRuM45yfd70wg0"
)

vector_store.to_dict()

更新向量数据库的名称,代码如下:
vector_store = client.beta.vector_stores.update(
  vector_store_id="vs_ztjSGgwqPnWxJsKcyJUnVmK1",
  name="test_kb_110"
)

vector_store.to_dict()

删除向量数据库:
deleted_vector_store = client.beta.vector_stores.delete(
  vector_store_id="vs_ztjSGgwqPnWxJsKcyJUnVmK1"
)

deleted_vector_store.to_dict()

Stage 3. 创建 Assistant 对象
如果想让Assistant对象进一步具备工具的调用能力的高阶能力,则需要借助如下参数:
参数名    类型    可选性    默认值    描述
tools    array    可选    []    启用在助手上的工具列表。每个助手最多可以有 128 个工具。工具类型包括:code_interpreter、file_search 和 function。
├── Code interpreter tool    object    可选    -    代码解释器工具。
├── FileSearch tool    object    可选    -    文件搜索工具。
└── Function tool    object    可选    -    功能工具。
tool_resources    object or null    可选    -    助手工具使用的资源集合。资源与工具类型相关。例如,代码解释器工具需要文件 ID 列表,而文件搜索工具需要向量存储 ID 列表。
├── code_interpreter    object    可选    -    代码解释器相关资源。
└── file_search    object    可选    -    文件搜索相关资源。


使用tools参数可以让 Assistant 最多访问 128 个工具。这包含OpenAI 内置的工具Code_interpreter和File_Search ,以及通过Function calling规范调用的第三方自定义工具。基于这种设定,这里我们创建一个启用文件搜索的Assistant(助手)对象,代码如下:
assistant = client.beta.assistants.create(
    name="Large language model technical assistant",   # 大语言模型技术助理
    instructions="You are a professional large model technician and apply your basic knowledge to answer large model related questions", # 你是一位专业的大模型技术人员,运用你的基础知识来回答大模型相关的问题
    model="gpt-4o-mini-2024-07-18",
    tools=[
        {"type": "file_search"}
    ],
)

Stage 4. 创建 Thread 对象

使用外部工具的相关参数:
参数名    类型    可选性    默认值    描述
tool_resources    object or null    可选    -    在此线程中为助手工具提供的一组资源。这些资源与工具类型相关。
├── code_interpreter    object    可选    -    代码解释器相关资源。
│ └── file_ids    array    可选    []    提供给代码解释器工具的文件 ID 列表。最多可关联 20 个文件。
└── file_search    object    可选    -    文件搜索相关资源。
├── vector_store_ids    array    可选    -    附加到此线程的向量存储 ID 列表。最多可关联 1 个向量存储。
└── vector_stores    array    可选    -    用于创建带有 file_ids 的向量存储并附加到此线程的助手。最多可关联 1 个向量存储。
└── file_ids    array    可选    -    要添加到向量存储的文件 ID 列表。最多可包含 10000 个文件。
└── chunking_strategy    object    可选    -    用于对文件进行分块的策略。如果未设置,将使用自动策略。
└── metadata    map    可选    -    可附加到向量存储的一组最多 16 个键值对,有助于以结构化格式存储附加信息。键最长 64 个字符,值最长 512 个字符。

Assistant对象与Thread对象都可以指定外部工具,那建立在哪里好?
不要两个里面都建立,稳定性不高

二.代码解释器
作用:允许Assistant对象在沙盒执行环境中编写和运行 Python 代码
该工具可以处理具有不同数据和格式的文件,并生成具有数据和图形图像的文件。主要用于解决具有挑战性的代码和数学问题。同时,当 Assistant对象编写的代码无法运行时,它可以通过尝试运行不同的代码来迭代此代码,直到代码执行成功。通常应用于需要进行数学分析或绘制图表的表格数据,即一些数据分析需求场景。
具体步骤如下:
from openai import OpenAI
client = OpenAI()

Stage 1. 启用代码解释器
在 Assistant 对象的tools参数中传递code_interpreter以启用代码解释器。
assistant = client.beta.assistants.create(
    name="Data Engineer",  # 数据分析师
    instructions="You're a weather data analyst. When asked for data information, write and run code to answer the question",  # 你是一名气象数据分析师。当被问及数据信息时,编写并运行代码来回答问题  model="gpt-4o-mini-2024-07-18",
    tools=[
        {"type": "code_interpreter"}
    ],
    model="gpt-4o-mini-2024-07-18",
)
如果想增强其使用的概率,可以通过在构建Assistant对象时在instructions中的提示来促进,例如明确说明“编写代码来解决这个问题”。


Stage 2. 创建线程
thread = client.beta.threads.create()
Stage 3.将文件传递给代码解释器
file = client.files.create(
  file=open("./data/weather_data_complex.csv", "rb"),
  purpose='assistants'
)

这里更新一下Assistant对象实例,传递需要执行分析的.csv文件。代码如下:

# 创建一个使用助手的ID
assistant = client.beta.assistants.update(
    assistant_id=assistant.id,
    name="Data Engineer",  # 数据分析师
    instructions="You're a weather data analyst. When asked for data information, write and run code to answer the question",  # 你是一名气象数据分析师。当被问及数据信息时,编写并运行代码来回答问题
    model="gpt-4o",
    tools=[{"type": "code_interpreter"}],
    tool_resources={
        "code_interpreter": {
            "file_ids": [file.id]
    }
  }
)

这里使用刚才创建的Thread对象实例,避免重复付费。
thread_message = client.beta.threads.messages.create(
    thread_id=thread.id,
    role="user",
    content="请帮我分析各个城市的实时天气数据,并绘制折线图",
)
print(thread_message)

Stage 4. 创建 Run 运行状态
run=client.beta.threads.runs.create(
  thread_id=thread.id,
  assistant_id=assistant.id
)
Run开始运行后,同样还是进入queued状态,需要一段时间才能完成。这里我们等待几秒钟再获取其最终的结果。
messages = client.beta.threads.messages.list(
    thread_id=thread.id
    )

last_message = messages.data[0]

得到结果:
last_message.to_dict()
{'id': 'msg_m4Zke5wDLKVsB5eh54xSZ8Ts',
 'assistant_id': 'asst_S5dB3bbWVIj3b2PraFk431PQ',
 'attachments': [],
 'content': [{'image_file': {'file_id': 'file-eBclJ2mOnNygdwe3OZkGNLkQ'},
   'type': 'image_file'},
  {'text': {'annotations': [],
    'value': '以上图像显示了各城市的平均温度、湿度和风速:\n\n- **左图**:展示了各城市的平均温度(单位:摄氏度)。可以看到一些城市的温度变化较大。\n- **中图**:展示了各城市的平均湿度(单位:%)。湿度也是各个城市之间差异明显。\n- **右图**:展示了各城市的平均风速(单位:米/秒)。某些城市的风速有明显的变化。\n\n为了更深入地分析天气状况分布,我们还将绘制各个城市的天气情况的分布直方图。'},
   'type': 'text'}],
 'created_at': 1727417373,
 'metadata': {},
 'object': 'thread.message',
 'role': 'assistant',
 'run_id': 'run_RI9oLWYEpdmKEoiQSxi4H8yT',
 'thread_id': 'thread_EKpSbWT5fra7zpm3ZhRxSHOH'}

Stage 5. 读取代码解释器生成的图像和文件
当代码解释器生成图像时,我们可以在最终消息响应的file_id字段中查找并下载该文件:
file_id = last_message.content[0].image_file.file_id
file_id

将图像文件下载至本地环境中,代码如下:
image_data = client.files.content(file_id=file_id)
image_data_bytes = image_data.read()

with open("./my-image.png", "wb") as file:
    file.write(image_data_bytes)

同时也可以做一个生成.csv文件的场景,如下:
thread_message = client.beta.threads.messages.create(
    thread_id=thread.id,
    role="user",
    content="根据平均温度对城市进行排序,并生成一个新的.csv文件供我下载",
    attachments=[
        {"file_id": file.id,
         "tools": [
             {"type": "code_interpreter"}
         ]
        }
      ]
)


run  = client.beta.threads.runs.create(
  thread_id=thread.id,
  assistant_id=assistant.id
)
messages = client.beta.threads.messages.list(
    thread_id=thread.id
    )

last_message = messages.data[0]

text = last_message.content[0].text.value
print(text)
得到结果:
我已经根据平均温度对城市进行了排序,并生成了一个新的 CSV 文件。您可以通过以下链接下载该文件:

[下载排序后的城市温度数据](sandbox:/mnt/data/sorted_cities_by_temperature.csv)

如果您还有其他需要帮忙的地方,请告诉我!


三. Assistant API 的 Function Calling
使用方式:在Assistant对象中添加描述
场景:用户能够动态执行Python 代码和 SQL 查询,结合这两个功能,用户可以在一个统一的平台上同时进行数据处理和查询
Stage 1. 定义外部函数
python_inter 函数,用户可以输入任意 Python 代码并在指定的全局环境中执行,获取执行结果或新变量。
sql_inter 函数与 MySQL 数据库交互,执行 SQL 查询并获取结果。
import json

def python_inter(py_code, g=None):
    """
    专门用于执行python代码,并获取最终查询或处理结果。
    :param py_code: 字符串形式的Python代码,
    :param g: g,字典形式变量,表示环境变量,若未提供则创建一个新的字典
    :return:代码运行的最终结果
    """
    # 使用空字典作为默认全局环境
    global_vars = {}

    try:
        # 尝试如果是表达式,则返回表达式运行结果
        return str(eval(py_code, global_vars))
    except Exception as e:
        # 记录执行前的全局变量
        global_vars_before = set(global_vars.keys())
        try:
            exec(py_code, global_vars)
        except Exception as e:
            return f"代码执行时报错: {e}"
        # 记录执行后的全局变量,确定新变量
        global_vars_after = set(global_vars.keys())
        new_vars = global_vars_after - global_vars_before
        # 若存在新变量
        if new_vars:
            result = {var: global_vars[var] for var in new_vars}
            return str(result)
        else:
            return "已经顺利执行代码"


def sql_inter(sql_query, host, user, password, database, port):
    """
    用于执行一段SQL代码,并最终获取SQL代码执行结果,
    核心功能是将输入的SQL代码传输至MySQL环境中进行运行,
    并最终返回SQL代码运行结果。需要注意的是,本函数是借助pymysql来连接MySQL数据库。
    :param sql_query: 字符串形式的SQL查询语句,用于执行对MySQL中telco_db数据库中各张表进行查询,并获得各表中的各类相关信息
    :param host: MySQL服务器的主机名
    :param user: MySQL服务器的用户名
    :param password: MySQL服务器的密码
    :param database: MySQL服务器的数据库名
    :param port: MySQL服务器的端口
    :param g: g,字符串形式变量,表示环境变量,无需设置,保持默认参数即可
    :return:sql_query在MySQL中的运行结果。
    """
    from datetime import datetime

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4"
    # 创建数据库引擎
    engine = create_engine(SQLALCHEMY_DATABASE_URI)
    # 创建会话
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    db_session = SessionLocal()

    try:
        from sqlalchemy import text
        # 执行SQL查询
        result = db_session.execute(text(sql_query))
        results = result.fetchall()

        # # 将结果转换为字典列表
        keys = result.keys()
        results_list = [dict(zip(keys, row)) for row in results]

        from decimal import Decimal
        
        def json_serial(obj):
            """JSON serializer for objects not serializable by default json code"""
            if isinstance(obj, datetime):
                return obj.isoformat()  # 将 datetime 对象转为 ISO 8601 格式的字符串
            elif isinstance(obj, Decimal):
                return float(obj)  # 将 Decimal 转为 float
            raise TypeError("Type not serializable")

        # 返回 JSON 格式的查询结果
        return json.dumps(results_list, default=json_serial)

    finally:
        db_session.close()  # 确保关闭会话

    # 返回 JSON 格式的查询结果

    return "数据库查询出错"


Stage 2. 定义外部函数的JSON Schema表示
      }
            },
            "required": ["py_code"]
        }
    }
}


sql_tool_desc = {
    "type": "function",
    "function": {
        "name": "sql_inter",
        "description": "Executes a SQL query on a MySQL database using pymysql and returns the results.",
        "parameters": {
            "type": "object",
            "properties": {
                "sql_query": {
                    "type": "string",
                    "description": "The SQL query to be executed against the MySQL database."
                },
                "host": {
                    "type": "string",
                    "description": "The hostname of the MySQL server."
                },
                "user": {
                    "type": "string",
                    "description": "The username for the MySQL server."
                },
                "password": {
                    "type": "string",
                    "description": "The password for the MySQL server."
                },
                "database": {
                    "type": "string",
                    "description": "The name of the database to connect to on the MySQL server."
                },
                "port": {
                    "type": "integer",
                    "description": "The port number on which the MySQL server is running."
                }
            },
            "required": ["sql_query", "host", "user", "password", "database", "port"]
        },
    }
}

available_functions = {
    "python_inter": python_inter,
    "sql_inter": sql_inter
}


Stage 3. 定义Assistant对象

准备好了外部函数和其对应的Json Schema描述,我们只需要在创建Assistant对象时,在Assistant对象的的tools参数下添加定义。代码如下:

from openai import OpenAI
client = OpenAI()

assistant = client.beta.assistants.create(
    name="Data Engineer",  # 数据分析师
    instructions="You're a senior data analyst. When asked for data information, write and run Python code to answer the question,\
                  The mysql database information you can connect to is: username = 'root', database_name = 'mategen', password = 'snowball950123', hostname = '192.168.110.131'",  # 你是一位高级数据分析师。当被要求提供数据信息时,编写并运行Python代码来回答这个问题
    model = "gpt-4o-mini-2024-07-18",
    tools = [
      python_tool_desc,
      sql_tool_desc
  ]
)

Stage 4. 创建Thread并添加消息
thread = client.beta.threads.create()
message = client.beta.threads.messages.create(
  thread_id=thread.id,
  role="user",
  content="请问 100 * 200 - 30 等于多少。",
)

Stage 5. 开始运行
run = client.beta.threads.runs.create_and_poll(
  thread_id=thread.id,
  assistant_id=assistant.id,
)


获取结果
run.to_dict()

当用户的提问触发函数调用功能时,Run会进入requires_action状态,表示需要调用函数,代码如下:
这里我们定义function_to_call函数实现打印函数输出的过程

def function_to_call(run, available_functions):
    
    tool_outputs = []

    tool_call = run.required_action.submit_tool_outputs.tool_calls[0]

    arguments = json.loads(tool_call.function.arguments)
    tool_outputs.append({
        "tool_call_id": tool_call.id,
        "output": available_functions[tool_call.function.name](**arguments)
    })

    return tool_outputs

tool_outputs = function_to_call(run, available_functions)

tool_outputs

接下来要做的事情是:把工具的执行结果,回传给 Run。 这里就需要借助client.beta.threads.runs.submit_tool_outputs()方法,它支持在工具调用全部完成后提交输出。

if tool_outputs:
  try:
    run = client.beta.threads.runs.submit_tool_outputs_and_poll(
      thread_id=thread.id,
      run_id=run.id,
      tool_outputs=tool_outputs
    )
    print("Tool outputs submitted successfully.")
  except Exception as e:
    print("Failed to submit tool outputs:", e)
else:
  print("No tool outputs to submit.")

提交完成后,我们只需要耐心的等待Run的继续执行即可。
if run.status == 'completed':
  messages = client.beta.threads.messages.list(
    thread_id=thread.id
  )
  print(messages.data[0].content[0].text.value)
else:
  print(run.status)

100 * 200 - 30 等于 19970。


最后,还可以测试更复杂的应用需求,这里我们的提问如下所示:

assistant = client.beta.assistants.create(
    name="Data Engineer",  # 数据分析师
    instructions="You're a senior data analyst. When asked for data information, write and run Python code to answer the question,\
                  The mysql database information you can connect to is: username = 'root', database_name = 'mategen', password = 'snowball950123', hostname = '192.168.110.131'",  # 你是一位高级数据分析师。当被要求提供数据信息时,编写并运行Python代码来回答这个问题
    model = "gpt-4o-mini-2024-07-18",
    tools = [
      python_tool_desc,
      sql_tool_desc
  ]
)

thread = client.beta.threads.create()

message = client.beta.threads.messages.create(
  thread_id=thread.id,
  role="user",
  content="帮我查询 employees 表中的内容,计算所有的人的工资总和,然后与 9 * 9 的结果进行相加,返回最终的结果。",
)

run = client.beta.threads.runs.create_and_poll(
  thread_id=thread.id,
  assistant_id=assistant.id,
)

if run.status == 'completed':
  messages = client.beta.threads.messages.list(
    thread_id=thread.id
  )
  print(messages)
else:
  print(run.status)

这里注意:并行函数调用场景并不是百分之百触发,大家再自己实践时,可通过尝试不同的提问和需求来进行场景复现。
run.to_dict()
如果在required_action中看到两个tool_calls ,表明用户触发了并行函数调用。
def function_to_call(run, available_functions):
    
    tool_outputs = []

    for tool_call in run.required_action.submit_tool_outputs.tool_calls:
        function_name = tool_call.function.name
        function_args = tool_call.function.arguments
        # 处理多次子调用的情况
        if function_name == 'multi_tool_use.parallel':
            tool_uses = json.loads(function_args).get('tool_uses', [])
            for tool_use in tool_uses:
   
                recipient_name = tool_use.get('recipient_name').split('.')[-1]
                parameters = tool_use.get('parameters')
    
                tool_outputs.append({
                    "tool_call_id": tool_call.id,
                    "output": available_functions[recipient_name](**parameters)
                })

        # 处理单个外部函数调用的情况
        else:
            arguments = json.loads(tool_call.function.arguments)
            tool_outputs.append({
                "tool_call_id": tool_call.id,
                "output": available_functions[tool_call.function.name](**arguments)
            })

    return tool_outputs

tool_outputs = function_to_call(run, available_functions)

tool_outputs

if tool_outputs:
  try:
    run = client.beta.threads.runs.submit_tool_outputs_and_poll(
      thread_id=thread.id,
      run_id=run.id,
      tool_outputs=tool_outputs
    )
    print("Tool outputs submitted successfully.")
  except Exception as e:
    print("Failed to submit tool outputs:", e)
else:
  print("No tool outputs to submit.")

if run.status == 'completed':
  messages = client.beta.threads.messages.list(
    thread_id=thread.id
  )
  print(messages.data[0].content[0].text.value)
else:
  print(run.status)
run.to_dict()

结果:根据查询,`employees` 表中所有人的工资总和为 **225000.0**。将此总和与 \(9 \times 9\) 的结果相加,得到的最终结果为 **225081.0**。
run_steps = client.beta.threads.runs.steps.list(
  thread_id=thread.id,
  run_id=run.id
)

run_steps.to_dict()

下一篇介绍
另外一个重要的知识点:流式功能。同时会通过实战案例展示如何高效结合代码解释器、文件搜索以及自定义函数,以实现更复杂、更智能的智能体流程。

http://www.lryc.cn/news/578782.html

相关文章:

  • 电源芯片之DCDC初探索ING
  • python 调用C/C++动态库
  • 网络基础知识与代理配置
  • BFD故障检测技术之概述
  • 隔离网络(JAVA)
  • 2025年7月最新英雄联盟战绩自动查询工具
  • sqlmap学习笔记ing(2.[第一章 web入门]SQL注入-2(报错,时间,布尔))
  • 应急响应类题练习——玄机第四章 windows实战-emlog
  • 快速手搓一个MCP服务指南(九): FastMCP 服务器组合技术:构建模块化AI应用的终极方案
  • Spring Boot 启动加载执行链路分析
  • [Python 基础课程]字符串
  • 深度学习常见的激活函数
  • [创业之路-458]:企业经营层 - 蓝海战略 - 重构价值曲线、整合产业要素、创造新需求
  • 复现一个nanoGPT——model.py
  • Android屏幕共享+WebSocket实现传输截图
  • uniapp选择相册
  • 学习字符串
  • 菜谱大全——字符串处理艺术:从文本解析到高效搜索 [特殊字符][特殊字符]
  • LL面试题11
  • 【Python】numpy数组常用数据处理(测试代码+api例程)
  • Web前端之JavaScript实现图片圆环、圆环元素根据角度指向圆心、translate、rotate
  • vue-34(单元测试 Vue 组件的介绍)
  • 第六章 OpenCV篇—傅里叶变换与直方图
  • 通过http调用来访问neo4j时报错,curl -X POST 执行指令报错
  • 2025 推理技术风向标:DeepSeek-R1 揭示大模型从 “记忆” 到 “思考” 的进化路径
  • 8.Docker镜像讲解
  • 【读代码】百度开源大模型:ERNIE项目解析
  • 1.MySQL之如何定位慢查询
  • Python应用指南:利用高德地图API获取公交+地铁可达圈(三)
  • 达梦数据库配置SYSDBA本地免密登录