016_Token计数与成本管理
Token计数与成本管理
目录
- Token计数概述
- Token计数API
- 成本计算
- 成本优化策略
- 监控和分析
- 预算管理
- 最佳实践
Token计数概述
什么是Token
Token是AI模型处理文本的基本单位。在Claude中,一个token大约对应:
- 英文:约0.75个单词
- 中文:约1-2个汉字
- 代码:约0.5-1个符号或关键词
Token计数的重要性
成本控制
- 精确预算:准确估算API调用成本
- 成本优化:识别高成本操作并优化
- 预算规划:制定合理的使用预算
- 费用监控:实时监控API使用费用
性能优化
- 速率限制管理:避免超出速率限制
- 响应时间优化:通过控制输入长度优化响应时间
- 资源规划:合理规划API资源使用
- 效率提升:优化提示词长度和结构
应用设计
- 输入验证:在发送请求前验证输入长度
- 动态调整:根据token数量动态调整策略
- 用户体验:为用户提供准确的费用预估
- 系统稳定性:避免因超长输入导致的错误
Token计数API
基本使用
简单文本计数
import anthropicclient = anthropic.Anthropic(api_key="your-key")# 计算简单消息的token数
response = client.messages.count_tokens(model="claude-sonnet-4-20250514",messages=[{"role": "user", "content": "Hello, how are you today?"}]
)print(f"输入tokens: {response.input_tokens}")
复杂消息计数
def count_complex_message_tokens():"""计算复杂消息的token数"""# 包含系统提示、多轮对话和工具的消息response = client.messages.count_tokens(model="claude-opus-4-20250514",system="你是一个专业的数据分析师,请用专业的语言回答问题。",messages=[{"role": "user","content": "请分析这组数据并给出洞察"},{"role": "assistant","content": "我会帮您分析数据。请提供具体的数据内容。"},{"role": "user","content": "数据:销售额Q1: 100万,Q2: 120万,Q3: 110万,Q4: 130万"}],tools=[{"name": "calculate","description": "执行数学计算","input_schema": {"type": "object","properties": {"expression": {"type": "string"}}}}])return response.input_tokens
多模态内容计数
def count_multimodal_tokens(image_path, text_content):"""计算多模态内容的token数"""import base64# 读取并编码图像with open(image_path, "rb") as image_file:image_data = base64.b64encode(image_file.read()).decode('utf-8')response = client.messages.count_tokens(model="claude-sonnet-4-20250514",messages=[{"role": "user","content": [{"type": "image","source": {"type": "base64","media_type": "image/jpeg","data": image_data}},{"type": "text","text": text_content}]}])return response.input_tokens
高级功能
批量Token计数
def batch_token_counting(message_list):"""批量计算多个消息的token数"""token_counts = []for i, message in enumerate(message_list):try:response = client.messages.count_tokens(model="claude-sonnet-4-20250514",messages=[message])token_counts.append({"message_id": i,"tokens": response.input_tokens,"content_preview": message["content"][:50] + "...","success": True})except Exception as e:token_counts.append({"message_id": i,"tokens": 0,"error": str(e),"success": False})return token_counts
Token计数缓存
import hashlib
import jsonclass TokenCountCache:def __init__(self):self.cache = {}def get_message_hash(self, model, messages, system=None, tools=None):"""生成消息的哈希值用于缓存"""cache_data = {"model": model,"messages": messages,"system": system,"tools": tools}content_str = json.dumps(cache_data, sort_keys=True)return hashlib.md5(content_str.encode()).hexdigest()def count_tokens_with_cache(self, model, messages, system=None, tools=None):"""带缓存的token计数"""cache_key = self.get_message_hash(model, messages, system, tools)if cache_key in self.cache:return self.cache[cache_key]# 调用API计算token数response = client.messages.count_tokens(model=model,messages=messages,system=system,tools=tools)# 缓存结果self.cache[cache_key] = response.input_tokensreturn response.input_tokens
成本计算
基础成本计算
单次请求成本
def calculate_request_cost(input_tokens, output_tokens, model="claude-sonnet-4-20250514"):"""计算单次请求的成本"""# 定价表(每百万token的价格,单位:美元)pricing = {"claude-opus-4-20250514": {"input": 15.0,"output": 75.0},"claude-sonnet-4-20250514": {"input": 3.0,"output": 15.0},"claude-haiku-3-20240307": {"input": 0.80,"output": 4.0}}if model not in pricing:raise ValueError(f"未知模型: {model}")input_cost = (input_tokens / 1_000_000) * pricing[model]["input"]output_cost = (output_tokens / 1_000_000) * pricing[model]["output"]return {"input_cost": input_cost,"output_cost": output_cost,"total_cost": input_cost + output_cost,"currency": "USD"}
批量成本计算
def calculate_batch_cost(batch_results, model):"""计算批处理的总成本"""total_input_tokens = 0total_output_tokens = 0for result in batch_results:if "usage" in result:total_input_tokens += result["usage"]["input_tokens"]total_output_tokens += result["usage"]["output_tokens"]return calculate_request_cost(total_input_tokens, total_output_tokens, model)
成本预估
def estimate_cost_before_request(messages, model, estimated_output_tokens=1000):"""在发送请求前估算成本"""# 计算输入token数response = client.messages.count_tokens(model=model,messages=messages)input_tokens = response.input_tokens# 估算成本estimated_cost = calculate_request_cost(input_tokens, estimated_output_tokens, model)return {"input_tokens": input_tokens,"estimated_output_tokens": estimated_output_tokens,"estimated_cost": estimated_cost,"model": model}
成本追踪系统
使用记录追踪
import datetime
import sqlite3class CostTracker:def __init__(self, db_path="api_usage.db"):self.db_path = db_pathself.init_database()def init_database(self):"""初始化数据库"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute('''CREATE TABLE IF NOT EXISTS api_usage (id INTEGER PRIMARY KEY AUTOINCREMENT,timestamp DATETIME,model TEXT,input_tokens INTEGER,output_tokens INTEGER,input_cost REAL,output_cost REAL,total_cost REAL,request_type TEXT,user_id TEXT)''')conn.commit()conn.close()def record_usage(self, model, input_tokens, output_tokens, request_type="standard", user_id=None):"""记录API使用情况"""cost_info = calculate_request_cost(input_tokens, output_tokens, model)conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute('''INSERT INTO api_usage (timestamp, model, input_tokens, output_tokens, input_cost, output_cost, total_cost, request_type, user_id)VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''', (datetime.datetime.now(),model,input_tokens,output_tokens,cost_info["input_cost"],cost_info["output_cost"],cost_info["total_cost"],request_type,user_id))conn.commit()conn.close()def get_usage_summary(self, start_date=None, end_date=None):"""获取使用统计摘要"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()query = '''SELECT model,COUNT(*) as request_count,SUM(input_tokens) as total_input_tokens,SUM(output_tokens) as total_output_tokens,SUM(total_cost) as total_costFROM api_usage'''params = []if start_date:query += " WHERE timestamp >= ?"params.append(start_date)if end_date:if start_date:query += " AND timestamp <= ?"else:query += " WHERE timestamp <= ?"params.append(end_date)query += " GROUP BY model"cursor.execute(query, params)results = cursor.fetchall()conn.close()return [{"model": row[0],"request_count": row[1],"total_input_tokens": row[2],"total_output_tokens": row[3],"total_cost": row[4]}for row in results]
成本优化策略
提示词优化
长度优化
def optimize_prompt_length(original_prompt, max_tokens=4000):"""优化提示词长度"""# 计算原始token数response = client.messages.count_tokens(model="claude-sonnet-4-20250514",messages=[{"role": "user", "content": original_prompt}])original_tokens = response.input_tokensif original_tokens <= max_tokens:return original_prompt, original_tokens# 简化策略optimization_strategies = [remove_redundant_examples,simplify_instructions,use_bullet_points,remove_verbose_explanations]optimized_prompt = original_promptfor strategy in optimization_strategies:optimized_prompt = strategy(optimized_prompt)response = client.messages.count_tokens(model="claude-sonnet-4-20250514",messages=[{"role": "user", "content": optimized_prompt}])if response.input_tokens <= max_tokens:breakreturn optimized_prompt, response.input_tokensdef remove_redundant_examples(text):"""移除冗余的示例"""# 实现示例去重逻辑return textdef simplify_instructions(text):"""简化指令语言"""# 实现指令简化逻辑return textdef use_bullet_points(text):"""使用项目符号替代冗长描述"""# 实现格式优化逻辑return textdef remove_verbose_explanations(text):"""移除冗长的解释"""# 实现解释精简逻辑return text
模型选择优化
def choose_optimal_model(task_complexity, budget_constraint):"""根据任务复杂度和预算选择最优模型"""models = [{"name": "claude-haiku-3-20240307","capability": "basic","cost_ratio": 1.0,"suitable_for": ["简单问答", "文本分类", "基础翻译"]},{"name": "claude-sonnet-4-20250514","capability": "advanced","cost_ratio": 3.75, # 相对于Haiku的成本倍数"suitable_for": ["复杂分析", "代码生成", "创意写作"]},{"name": "claude-opus-4-20250514","capability": "expert","cost_ratio": 18.75, # 相对于Haiku的成本倍数"suitable_for": ["高级推理", "复杂问题解决", "专业分析"]}]# 根据任务复杂度和预算约束选择模型suitable_models = [model for model in modelsif model["cost_ratio"] <= budget_constraint andis_capable_for_task(model["capability"], task_complexity)]# 选择成本效益最优的模型if suitable_models:return min(suitable_models, key=lambda x: x["cost_ratio"])else:# 如果没有合适的模型,返回最基础的模型return models[0]def is_capable_for_task(model_capability, task_complexity):"""判断模型能力是否适合任务复杂度"""capability_levels = {"basic": 1, "advanced": 2, "expert": 3}complexity_levels = {"simple": 1, "medium": 2, "complex": 3}return capability_levels[model_capability] >= complexity_levels[task_complexity]
批处理优化
批处理vs单独调用成本分析
def compare_batch_vs_individual_cost(requests, model):"""比较批处理和单独调用的成本"""# 计算单独调用的成本individual_costs = []for request in requests:tokens = client.messages.count_tokens(model=model,messages=request["messages"]).input_tokensestimated_cost = calculate_request_cost(tokens, 1000, model)individual_costs.append(estimated_cost["total_cost"])individual_total = sum(individual_costs)# 批处理成本(50%折扣)batch_total = individual_total * 0.5return {"individual_cost": individual_total,"batch_cost": batch_total,"savings": individual_total - batch_total,"savings_percentage": ((individual_total - batch_total) / individual_total) * 100,"recommendation": "batch" if batch_total < individual_total else "individual"}
缓存策略
响应缓存
class ResponseCache:def __init__(self, max_cache_size=1000):self.cache = {}self.access_count = {}self.max_cache_size = max_cache_sizedef get_cache_key(self, model, messages, max_tokens):"""生成缓存键"""content = {"model": model,"messages": messages,"max_tokens": max_tokens}return hashlib.md5(json.dumps(content, sort_keys=True).encode()).hexdigest()def get_cached_response(self, model, messages, max_tokens):"""获取缓存的响应"""cache_key = self.get_cache_key(model, messages, max_tokens)if cache_key in self.cache:self.access_count[cache_key] = self.access_count.get(cache_key, 0) + 1return self.cache[cache_key]return Nonedef cache_response(self, model, messages, max_tokens, response):"""缓存响应"""cache_key = self.get_cache_key(model, messages, max_tokens)# 如果缓存已满,移除最少使用的项if len(self.cache) >= self.max_cache_size:least_used_key = min(self.access_count, key=self.access_count.get)del self.cache[least_used_key]del self.access_count[least_used_key]self.cache[cache_key] = responseself.access_count[cache_key] = 1def get_cache_stats(self):"""获取缓存统计信息"""return {"cache_size": len(self.cache),"total_accesses": sum(self.access_count.values()),"average_access_per_item": sum(self.access_count.values()) / len(self.cache) if self.cache else 0}
监控和分析
实时监控
成本监控仪表板
class CostMonitor:def __init__(self, cost_tracker):self.cost_tracker = cost_trackerdef get_realtime_metrics(self):"""获取实时监控指标"""today = datetime.date.today()# 今日使用统计today_usage = self.cost_tracker.get_usage_summary(start_date=today,end_date=today + datetime.timedelta(days=1))# 本月使用统计month_start = today.replace(day=1)month_usage = self.cost_tracker.get_usage_summary(start_date=month_start)return {"today": {"total_cost": sum(usage["total_cost"] for usage in today_usage),"total_requests": sum(usage["request_count"] for usage in today_usage),"by_model": today_usage},"this_month": {"total_cost": sum(usage["total_cost"] for usage in month_usage),"total_requests": sum(usage["request_count"] for usage in month_usage),"by_model": month_usage}}def check_budget_alerts(self, daily_budget, monthly_budget):"""检查预算警告"""metrics = self.get_realtime_metrics()alerts = []# 检查日预算today_cost = metrics["today"]["total_cost"]if today_cost > daily_budget * 0.8:alerts.append({"type": "daily_budget_warning","message": f"今日费用已达到预算的{(today_cost/daily_budget)*100:.1f}%","severity": "warning" if today_cost < daily_budget else "critical"})# 检查月预算month_cost = metrics["this_month"]["total_cost"]if month_cost > monthly_budget * 0.8:alerts.append({"type": "monthly_budget_warning","message": f"本月费用已达到预算的{(month_cost/monthly_budget)*100:.1f}%","severity": "warning" if month_cost < monthly_budget else "critical"})return alerts
使用分析
使用模式分析
def analyze_usage_patterns(cost_tracker):"""分析使用模式"""# 获取最近30天的数据end_date = datetime.date.today()start_date = end_date - datetime.timedelta(days=30)usage_data = cost_tracker.get_detailed_usage(start_date, end_date)analysis = {"peak_usage_hours": analyze_peak_hours(usage_data),"model_preferences": analyze_model_usage(usage_data),"cost_trends": analyze_cost_trends(usage_data),"efficiency_metrics": analyze_efficiency(usage_data)}return analysisdef analyze_peak_hours(usage_data):"""分析高峰使用时段"""hourly_usage = {}for record in usage_data:hour = record["timestamp"].hourhourly_usage[hour] = hourly_usage.get(hour, 0) + record["total_cost"]peak_hour = max(hourly_usage, key=hourly_usage.get)return {"peak_hour": peak_hour,"peak_cost": hourly_usage[peak_hour],"hourly_distribution": hourly_usage}def analyze_model_usage(usage_data):"""分析模型使用偏好"""model_stats = {}for record in usage_data:model = record["model"]if model not in model_stats:model_stats[model] = {"usage_count": 0,"total_cost": 0,"avg_tokens_per_request": 0}model_stats[model]["usage_count"] += 1model_stats[model]["total_cost"] += record["total_cost"]model_stats[model]["avg_tokens_per_request"] += (record["input_tokens"] + record["output_tokens"])# 计算平均值for model in model_stats:if model_stats[model]["usage_count"] > 0:model_stats[model]["avg_tokens_per_request"] /= model_stats[model]["usage_count"]return model_stats
预算管理
预算设置和控制
动态预算管理
class BudgetManager:def __init__(self, cost_tracker):self.cost_tracker = cost_trackerself.budgets = {}self.alerts_sent = set()def set_budget(self, budget_type, amount, period="monthly"):"""设置预算"""self.budgets[budget_type] = {"amount": amount,"period": period,"created_date": datetime.date.today()}def check_budget_status(self, budget_type):"""检查预算状态"""if budget_type not in self.budgets:return Nonebudget = self.budgets[budget_type]current_usage = self.get_current_usage(budget["period"])usage_percentage = (current_usage / budget["amount"]) * 100return {"budget_amount": budget["amount"],"current_usage": current_usage,"remaining": budget["amount"] - current_usage,"usage_percentage": usage_percentage,"status": self.get_budget_status(usage_percentage)}def get_budget_status(self, usage_percentage):"""获取预算状态"""if usage_percentage >= 100:return "exceeded"elif usage_percentage >= 90:return "critical"elif usage_percentage >= 75:return "warning"else:return "normal"def enforce_budget_limits(self, budget_type, requested_cost):"""强制执行预算限制"""status = self.check_budget_status(budget_type)if not status:return True # 没有设置预算,允许执行projected_usage = status["current_usage"] + requested_costif projected_usage > status["budget_amount"]:return False # 超出预算,拒绝执行return True # 在预算范围内,允许执行
成本预测
基于历史数据的成本预测
def predict_monthly_cost(cost_tracker):"""基于历史数据预测月度成本"""# 获取过去3个月的数据end_date = datetime.date.today()start_date = end_date - datetime.timedelta(days=90)historical_data = cost_tracker.get_daily_usage(start_date, end_date)# 计算日均成本daily_costs = [record["total_cost"] for record in historical_data]avg_daily_cost = sum(daily_costs) / len(daily_costs)# 预测本月剩余成本today = datetime.date.today()days_in_month = calendar.monthrange(today.year, today.month)[1]days_passed = today.daydays_remaining = days_in_month - days_passed# 获取本月已产生的成本month_start = today.replace(day=1)month_usage = cost_tracker.get_usage_summary(month_start)current_month_cost = sum(usage["total_cost"] for usage in month_usage)# 预测剩余成本predicted_remaining_cost = avg_daily_cost * days_remainingpredicted_total_cost = current_month_cost + predicted_remaining_costreturn {"current_month_cost": current_month_cost,"predicted_remaining_cost": predicted_remaining_cost,"predicted_total_cost": predicted_total_cost,"avg_daily_cost": avg_daily_cost,"days_remaining": days_remaining}
最佳实践
成本控制策略
1. 预请求优化
def optimize_before_request(messages, model, max_budget_per_request=1.0):"""请求前优化策略"""# 计算预估成本estimation = estimate_cost_before_request(messages, model)if estimation["estimated_cost"]["total_cost"] > max_budget_per_request:# 成本过高,尝试优化optimized_messages = optimize_messages_for_cost(messages, max_budget_per_request, model)return optimized_messagesreturn messagesdef optimize_messages_for_cost(messages, budget, model):"""为成本优化消息"""optimization_strategies = [("使用更便宜的模型", lambda: suggest_cheaper_model(model)),("缩短输入长度", lambda: shorten_input(messages)),("减少输出长度", lambda: reduce_max_tokens(messages)),("简化提示词", lambda: simplify_prompts(messages))]for strategy_name, strategy_func in optimization_strategies:optimized = strategy_func()# 重新估算成本estimation = estimate_cost_before_request(optimized, model)if estimation["estimated_cost"]["total_cost"] <= budget:print(f"应用策略: {strategy_name}")return optimized# 如果所有策略都无法满足预算,返回最基础的版本return create_minimal_message(messages)
2. 智能缓存策略
class IntelligentCache:def __init__(self):self.cache = ResponseCache()self.cost_tracker = CostTracker()def should_use_cache(self, messages, model):"""决定是否使用缓存"""# 计算请求成本tokens = client.messages.count_tokens(model=model, messages=messages).input_tokensestimated_cost = calculate_request_cost(tokens, 1000, model)["total_cost"]# 高成本请求更倾向于使用缓存if estimated_cost > 0.1: # 超过10美分return True# 检查是否为重复或相似请求similarity_threshold = 0.8for cached_key in self.cache.cache.keys():if self.calculate_similarity(messages, cached_key) > similarity_threshold:return Truereturn Falsedef calculate_similarity(self, messages, cached_key):"""计算消息相似度"""# 实现消息相似度计算逻辑# 这里简化为字符串相似度current_text = str(messages)cached_text = str(cached_key)# 使用简单的编辑距离算法return 1 - (edit_distance(current_text, cached_text) / max(len(current_text), len(cached_text)))
3. 使用模式优化
def optimize_usage_patterns():"""优化使用模式的建议"""recommendations = [{"pattern": "高频简单任务","suggestion": "使用Claude Haiku降低成本","potential_savings": "80%"},{"pattern": "批量处理","suggestion": "使用Batch API获得50%折扣","potential_savings": "50%"},{"pattern": "重复查询","suggestion": "实现智能缓存系统","potential_savings": "60-90%"},{"pattern": "长文档处理","suggestion": "分段处理减少单次token消耗","potential_savings": "30-50%"}]return recommendations
通过系统化的token计数和成本管理,可以有效控制API使用成本,提高应用的经济效益和可持续性。