Python统一调用多家大模型API指南
随着大模型技术的快速发展,市场上出现了越来越多的LLM服务提供商,包括OpenAI、Anthropic、Google、百度、阿里云等。作为开发者,我们经常需要在不同的模型之间切换,或者同时使用多个模型来满足不同的业务需求。本文将详细介绍如何在Python中统一调用多家大模型API,并提供完整的代码示例。
为什么需要统一API调用?
在实际开发中,我们可能会遇到以下场景:
- 模型对比测试:需要在不同模型间进行效果对比
- 成本优化:根据任务复杂度选择不同价格的模型
- 可用性保障:当主要模型服务不可用时,快速切换到备用模型
- 特定能力需求:不同模型在某些任务上表现各异
方案一:使用LiteLLM - 轻量级统一接口
LiteLLM是专门为统一API调用设计的轻量级库,支持100+不同的LLM提供商,提供OpenAI格式的统一接口。
安装和基础使用
pip install litellm
基础调用示例
from litellm import completion
import os# 设置各厂家的API密钥
os.environ["OPENAI_API_KEY"] = "your_openai_key"
os.environ["ANTHROPIC_API_KEY"] = "your_anthropic_key"
os.environ["GOOGLE_API_KEY"] = "your_google_key"def test_multiple_models():messages = [{"role": "user", "content": "请用中文介绍一下机器学习"}]# 调用OpenAI GPT-4response1 = completion(model="gpt-4", messages=messages,temperature=0.7)print("GPT-4 回复:", response1.choices[0].message.content)# 调用Anthropic Clauderesponse2 = completion(model="claude-3-sonnet-20240229", messages=messages,temperature=0.7)print("Claude 回复:", response2.choices[0].message.content)# 调用Google Geminiresponse3 = completion(model="gemini-pro", messages=messages,temperature=0.7)print("Gemini 回复:", response3.choices[0].message.content)if __name__ == "__main__":test_multiple_models()
高级功能:流式响应和错误处理
from litellm import completion
import litellmdef stream_chat_with_fallback(messages, models=["gpt-4", "claude-3-sonnet-20240229", "gemini-pro"]):"""带故障切换的流式聊天"""for model in models:try:print(f"尝试使用模型: {model}")response = completion(model=model,messages=messages,stream=True,temperature=0.7)print(f"使用 {model} 的回复:")for chunk in response:if chunk.choices[0].delta.content:print(chunk.choices[0].delta.content, end="")print("\n")return # 成功后退出except Exception as e:print(f"模型 {model} 调用失败: {e}")continueprint("所有模型都调用失败")# 使用示例
messages = [{"role": "user", "content": "请详细解释什么是Transformer架构"}]
stream_chat_with_fallback(messages)
方案二:使用LangChain - 全功能框架
LangChain不仅提供API调用,还是一个完整的LLM应用开发框架。
安装和配置
pip install langchain langchain-openai langchain-anthropic langchain-google-genai
多模型对比示例
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
import asyncioclass MultiModelComparison:def __init__(self):self.models = {"gpt-4": ChatOpenAI(model="gpt-4", temperature=0.7),"claude-3-sonnet": ChatAnthropic(model="claude-3-sonnet-20240229", temperature=0.7),"gemini-pro": ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.7)}async def compare_models(self, question):"""异步对比多个模型的回复"""tasks = []for model_name, model in self.models.items():task = self.get_model_response(model_name, model, question)tasks.append(task)results = await asyncio.gather(*tasks, return_exceptions=True)return resultsasync def get_model_response(self, model_name, model, question):"""获取单个模型的回复"""try:message = HumanMessage(content=question)response = await model.ainvoke([message])return {"model": model_name,"response": response.content,"success": True}except Exception as e:return {"model": model_name,"error": str(e),"success": False}# 使用示例
async def main():comparator = MultiModelComparison()question = "请解释深度学习中的反向传播算法"results = await comparator.compare_models(question)for result in results:print(f"\n{'='*50}")print(f"模型: {result['model']}")if result['success']:print(f"回复: {result['response'][:200]}...")else:print(f"错误: {result['error']}")# 运行异步函数
if __name__ == "__main__":asyncio.run(main())
LangChain链式调用示例
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParserdef create_translation_chain():"""创建翻译链:GPT-4翻译,Claude校对"""# GPT-4负责翻译translator = ChatOpenAI(model="gpt-4", temperature=0.3)translation_prompt = ChatPromptTemplate.from_template("请将以下中文准确翻译为英文:\n{text}")# Claude负责校对proofreader = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=0.3)proofread_prompt = ChatPromptTemplate.from_template("请校对以下英文翻译,如有问题请修正:\n{translation}")# 构建链translation_chain = (translation_prompt | translator | StrOutputParser())proofread_chain = (proofread_prompt | proofreader | StrOutputParser())return translation_chain, proofread_chaindef translate_and_proofread(text):"""翻译并校对"""translation_chain, proofread_chain = create_translation_chain()# 第一步:翻译translation = translation_chain.invoke({"text": text})print(f"GPT-4翻译结果: {translation}")# 第二步:校对final_result = proofread_chain.invoke({"translation": translation})print(f"Claude校对结果: {final_result}")return final_result# 使用示例
chinese_text = "人工智能正在深刻改变我们的生活方式和工作方式"
result = translate_and_proofread(chinese_text)
方案三:自定义SDK封装
如果你需要更灵活的控制,可以创建自己的统一封装类。
基础封装架构
from abc import ABC, abstractmethod
import openai
import anthropic
from google.generativeai import GenerativeModel
import google.generativeai as genai
import json
from typing import List, Dict, Any, Optional
import timeclass LLMProvider(ABC):"""LLM提供商的抽象基类"""@abstractmethoddef chat(self, messages: List[Dict], **kwargs) -> str:pass@abstractmethoddef stream_chat(self, messages: List[Dict], **kwargs):passclass OpenAIProvider(LLMProvider):def __init__(self, api_key: str, model: str = "gpt-4"):self.client = openai.OpenAI(api_key=api_key)self.model = modeldef chat(self, messages: List[Dict], **kwargs) -> str:response = self.client.chat.completions.create(model=self.model,messages=messages,temperature=kwargs.get('temperature', 0.7),max_tokens=kwargs.get('max_tokens', 1000))return response.choices[0].message.contentdef stream_chat(self, messages: List[Dict], **kwargs):response = self.client.chat.completions.create(model=self.model,messages=messages,stream=True,temperature=kwargs.get('temperature', 0.7),max_tokens=kwargs.get('max_tokens', 1000))for chunk in response:if chunk.choices[0].delta.content:yield chunk.choices[0].delta.contentclass AnthropicProvider(LLMProvider):def __init__(self, api_key: str, model: str = "claude-3-sonnet-20240229"):self.client = anthropic.Anthropic(api_key=api_key)self.model = modeldef chat(self, messages: List[Dict], **kwargs) -> str:response = self.client.messages.create(model=self.model,messages=messages,temperature=kwargs.get('temperature', 0.7),max_tokens=kwargs.get('max_tokens', 1000))return response.content[0].textdef stream_chat(self, messages: List[Dict], **kwargs):with self.client.messages.stream(model=self.model,messages=messages,temperature=kwargs.get('temperature', 0.7),max_tokens=kwargs.get('max_tokens', 1000)) as stream:for text in stream.text_stream:yield textclass GoogleProvider(LLMProvider):def __init__(self, api_key: str, model: str = "gemini-pro"):genai.configure(api_key=api_key)self.model = GenerativeModel(model)def chat(self, messages: List[Dict], **kwargs) -> str:# 转换消息格式prompt = self._convert_messages(messages)response = self.model.generate_content(prompt)return response.textdef stream_chat(self, messages: List[Dict], **kwargs):prompt = self._convert_messages(messages)response = self.model.generate_content(prompt, stream=True)for chunk in response:if chunk.text:yield chunk.textdef _convert_messages(self, messages: List[Dict]) -> str:"""将OpenAI格式的消息转换为Google格式"""prompt = ""for msg in messages:if msg["role"] == "user":prompt += f"User: {msg['content']}\n"elif msg["role"] == "assistant":prompt += f"Assistant: {msg['content']}\n"return promptclass UnifiedLLMClient:"""统一的LLM客户端"""def __init__(self):self.providers: Dict[str, LLMProvider] = {}self.default_provider = Noneself.retry_count = 3self.retry_delay = 1def add_provider(self, name: str, provider: LLMProvider, is_default: bool = False):"""添加LLM提供商"""self.providers[name] = providerif is_default or self.default_provider is None:self.default_provider = namedef chat(self, messages: List[Dict], provider: Optional[str] = None, **kwargs) -> str:"""统一的聊天接口"""provider_name = provider or self.default_providerif provider_name not in self.providers:raise ValueError(f"Provider {provider_name} not found")return self._execute_with_retry(self.providers[provider_name].chat,messages,**kwargs)def stream_chat(self, messages: List[Dict], provider: Optional[str] = None, **kwargs):"""统一的流式聊天接口"""provider_name = provider or self.default_providerif provider_name not in self.providers:raise ValueError(f"Provider {provider_name} not found")return self.providers[provider_name].stream_chat(messages, **kwargs)def chat_with_fallback(self, messages: List[Dict], providers: List[str] = None, **kwargs) -> Dict[str, Any]:"""带故障切换的聊天"""providers = providers or list(self.providers.keys())for provider_name in providers:try:start_time = time.time()response = self.chat(messages, provider=provider_name, **kwargs)end_time = time.time()return {"provider": provider_name,"response": response,"success": True,"response_time": end_time - start_time}except Exception as e:print(f"Provider {provider_name} failed: {e}")continueraise Exception("All providers failed")def _execute_with_retry(self, func, *args, **kwargs):"""带重试的执行函数"""for attempt in range(self.retry_count):try:return func(*args, **kwargs)except Exception as e:if attempt == self.retry_count - 1:raise etime.sleep(self.retry_delay * (2 ** attempt)) # 指数退避
使用自定义封装的完整示例
import os
from typing import List, Dictdef setup_unified_client():"""设置统一客户端"""client = UnifiedLLMClient()# 添加OpenAI提供商if os.getenv("OPENAI_API_KEY"):openai_provider = OpenAIProvider(api_key=os.getenv("OPENAI_API_KEY"),model="gpt-4")client.add_provider("openai", openai_provider, is_default=True)# 添加Anthropic提供商if os.getenv("ANTHROPIC_API_KEY"):anthropic_provider = AnthropicProvider(api_key=os.getenv("ANTHROPIC_API_KEY"),model="claude-3-sonnet-20240229")client.add_provider("anthropic", anthropic_provider)# 添加Google提供商if os.getenv("GOOGLE_API_KEY"):google_provider = GoogleProvider(api_key=os.getenv("GOOGLE_API_KEY"),model="gemini-pro")client.add_provider("google", google_provider)return clientdef demo_unified_client():"""演示统一客户端的使用"""client = setup_unified_client()messages = [{"role": "user", "content": "请解释什么是大语言模型,并给出一个实际应用案例"}]# 1. 使用默认提供商print("=== 使用默认提供商 ===")response = client.chat(messages)print(f"回复: {response}\n")# 2. 指定提供商print("=== 指定使用Anthropic ===")try:response = client.chat(messages, provider="anthropic")print(f"回复: {response}\n")except Exception as e:print(f"调用失败: {e}\n")# 3. 带故障切换的调用print("=== 带故障切换的调用 ===")try:result = client.chat_with_fallback(messages, providers=["anthropic", "openai", "google"])print(f"使用的提供商: {result['provider']}")print(f"响应时间: {result['response_time']:.2f}秒")print(f"回复: {result['response'][:100]}...\n")except Exception as e:print(f"所有提供商都失败: {e}\n")# 4. 流式调用print("=== 流式调用 ===")try:stream = client.stream_chat(messages, provider="openai")for chunk in stream:print(chunk, end="", flush=True)print("\n")except Exception as e:print(f"流式调用失败: {e}\n")if __name__ == "__main__":demo_unified_client()
方案四:使用开源网关解决方案
对于企业级应用,推荐使用开源的API网关解决方案,如One-API。
One-API部署和使用
# 使用Docker部署One-API
docker run -d \--name one-api \-p 3000:3000 \-e SQL_DSN="root:password@tcp(localhost:3306)/oneapi" \-e SESSION_SECRET="your-secret-key" \-e INITIAL_ROOT_TOKEN="your-initial-token" \justsong/one-api:latest
通过One-API调用示例
import requests
import jsonclass OneAPIClient:def __init__(self, base_url: str, api_key: str):self.base_url = base_url.rstrip('/')self.api_key = api_keyself.headers = {'Authorization': f'Bearer {api_key}','Content-Type': 'application/json'}def chat(self, messages: List[Dict], model: str = "gpt-3.5-turbo", **kwargs):"""通过One-API统一接口调用"""url = f"{self.base_url}/v1/chat/completions"payload = {"model": model,"messages": messages,"temperature": kwargs.get('temperature', 0.7),"max_tokens": kwargs.get('max_tokens', 1000),"stream": kwargs.get('stream', False)}response = requests.post(url, headers=self.headers, json=payload)if response.status_code == 200:return response.json()else:raise Exception(f"API call failed: {response.status_code}, {response.text}")def list_models(self):"""获取可用模型列表"""url = f"{self.base_url}/v1/models"response = requests.get(url, headers=self.headers)if response.status_code == 200:return response.json()else:raise Exception(f"Failed to get models: {response.status_code}")# 使用示例
def demo_one_api():client = OneAPIClient(base_url="http://localhost:3000",api_key="your-one-api-key")# 获取可用模型models = client.list_models()print("可用模型:", [model['id'] for model in models['data']])# 调用不同模型messages = [{"role": "user", "content": "你好,请介绍一下你自己"}]for model in ["gpt-4", "claude-3-sonnet", "gemini-pro"]:try:response = client.chat(messages, model=model)print(f"\n{model} 回复:")print(response['choices'][0]['message']['content'])except Exception as e:print(f"{model} 调用失败: {e}")if __name__ == "__main__":demo_one_api()
性能监控和日志记录
在生产环境中,我们需要对多模型调用进行监控和日志记录。
import logging
import time
from functools import wraps
from typing import Dict, Any
import jsonclass LLMMonitor:def __init__(self):self.setup_logging()self.metrics = {'total_requests': 0,'successful_requests': 0,'failed_requests': 0,'response_times': [],'provider_usage': {}}def setup_logging(self):"""设置日志记录"""logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler('llm_requests.log'),logging.StreamHandler()])self.logger = logging.getLogger('LLMMonitor')def log_request(self, provider: str, model: str, messages: List[Dict], **kwargs):"""记录请求日志"""self.logger.info(f"Request - Provider: {provider}, Model: {model}, Messages: {len(messages)}")self.metrics['total_requests'] += 1self.metrics['provider_usage'][provider] = self.metrics['provider_usage'].get(provider, 0) + 1def log_response(self, provider: str, model: str, response: str, response_time: float, success: bool):"""记录响应日志"""if success:self.metrics['successful_requests'] += 1self.metrics['response_times'].append(response_time)self.logger.info(f"Success - Provider: {provider}, Model: {model}, Time: {response_time:.2f}s")else:self.metrics['failed_requests'] += 1self.logger.error(f"Failed - Provider: {provider}, Model: {model}")def get_metrics(self) -> Dict[str, Any]:"""获取性能指标"""avg_response_time = sum(self.metrics['response_times']) / len(self.metrics['response_times']) if self.metrics['response_times'] else 0return {'total_requests': self.metrics['total_requests'],'successful_requests': self.metrics['successful_requests'],'failed_requests': self.metrics['failed_requests'],'success_rate': self.metrics['successful_requests'] / self.metrics['total_requests'] if self.metrics['total_requests'] > 0 else 0,'average_response_time': avg_response_time,'provider_usage': self.metrics['provider_usage']}def monitor_llm_call(monitor: LLMMonitor):"""装饰器:监控LLM调用"""def decorator(func):@wraps(func)def wrapper(*args, **kwargs):start_time = time.time()provider = kwargs.get('provider', 'unknown')model = kwargs.get('model', 'unknown')messages = args[1] if len(args) > 1 else []monitor.log_request(provider, model, messages, **kwargs)try:result = func(*args, **kwargs)end_time = time.time()response_time = end_time - start_timemonitor.log_response(provider, model, str(result), response_time, True)return resultexcept Exception as e:end_time = time.time()response_time = end_time - start_timemonitor.log_response(provider, model, str(e), response_time, False)raisereturn wrapperreturn decorator# 使用监控装饰器的示例
monitor = LLMMonitor()@monitor_llm_call(monitor)
def monitored_chat(client: UnifiedLLMClient, messages: List[Dict], **kwargs):return client.chat(messages, **kwargs)# 使用示例
def demo_monitoring():client = setup_unified_client()messages = [{"role": "user", "content": "请解释机器学习的基本概念"}]# 进行多次调用以生成监控数据for i in range(5):try:response = monitored_chat(client, messages, provider="openai")print(f"调用 {i+1} 成功")except Exception as e:print(f"调用 {i+1} 失败: {e}")# 查看性能指标metrics = monitor.get_metrics()print("\n=== 性能指标 ===")print(json.dumps(metrics, indent=2, ensure_ascii=False))if __name__ == "__main__":demo_monitoring()
最佳实践建议
1. 错误处理和重试机制
import random
import time
from typing import List, Callableclass RobustLLMClient:def __init__(self, client: UnifiedLLMClient):self.client = clientself.max_retries = 3self.base_delay = 1self.max_delay = 60def exponential_backoff(self, attempt: int) -> float:"""指数退避算法"""delay = self.base_delay * (2 ** attempt)jitter = random.uniform(0, 0.1) * delayreturn min(delay + jitter, self.max_delay)def robust_chat(self, messages: List[Dict], providers: List[str] = None, **kwargs) -> Dict[str, Any]:"""健壮的聊天调用"""providers = providers or list(self.client.providers.keys())for provider in providers:for attempt in range(self.max_retries):try:start_time = time.time()response = self.client.chat(messages, provider=provider, **kwargs)end_time = time.time()return {"provider": provider,"response": response,"success": True,"response_time": end_time - start_time,"attempt": attempt + 1}except Exception as e:if attempt < self.max_retries - 1:delay = self.exponential_backoff(attempt)print(f"Provider {provider} attempt {attempt + 1} failed, retrying in {delay:.2f}s: {e}")time.sleep(delay)else:print(f"Provider {provider} failed after {self.max_retries} attempts: {e}")breakraise Exception("All providers and retries exhausted")
2. 配置驱动的模型选择
import yaml
from typing import Dict, Anyclass ConfigurableLLMClient:def __init__(self, config_path: str):with open(config_path, 'r', encoding='utf-8') as f:self.config = yaml.safe_load(f)self.client = self.setup_client()def setup_client(self) -> UnifiedLLMClient:"""根据配置设置客户端"""client = UnifiedLLMClient()for provider_config in self.config['providers']:provider_name = provider_config['name']provider_type = provider_config['type']if provider_type == 'openai':provider = OpenAIProvider(api_key=provider_config['api_key'],model=provider_config['model'])elif provider_type == 'anthropic':provider = AnthropicProvider(api_key=provider_config['api_key'],model=provider_config['model'])elif provider_type == 'google':provider = GoogleProvider(api_key=provider_config['api_key'],model=provider_config['model'])client.add_provider(provider_name, provider, is_default=provider_config.get('is_default', False))return clientdef get_provider_for_task(self, task_type: str) -> str:"""根据任务类型选择提供商"""task_mapping = self.config.get('task_mapping', {})return task_mapping.get(task_type, self.client.default_provider)def smart_chat(self, messages: List[Dict], task_type: str = "general", **kwargs) -> str:"""智能聊天:根据任务类型选择最适合的模型"""provider = self.get_provider_for_task(task_type