第19章:成本优化与性能调优
一个 Agent 系统在 demo 阶段可能每天消耗几十美元,到了日活万级的生产环境,月账单轻松突破数万美元。不是模型变贵了,是架构没变。
这不是夸张。一个典型的生产级 Agent——系统提示 4000 token、工具定义 2000 token、对话历史 6000 token、每次响应 800 token——如果每次调用都从零开始计算,每次请求的成本比实际必要的高出好几倍。规模一旦上来,这个差距就变成了真金白银。
成本优化和性能调优不是两个独立话题。在 LLM 场景里,它们深度耦合:更快的响应通常意味着更少的等待时间,更少的 token 消耗通常意味着更快的处理速度。减少不必要的工作,是同时提升速度和降低成本的核心思路。
这一章从 Token 经济学讲起,系统介绍 Agent 系统里最有效的几类优化手段:Prompt Caching、模型路由、工具调用优化、并发控制,以及如何用配置管理让这些优化可迭代、可回滚。
19.1 Token 经济学 —— 钱是怎么烧掉的
核心直觉
每次调用 LLM API,你都在为两件事付钱:你发出去的 token(输入)和模型生成的 token(输出)。这两类 token 的价格不一样,而且差距相当大——输出 token 通常比输入贵得多。理解这个不对称,是成本优化的起点。
但生产级 Agent 的账单不只来自这两项。完整的单位任务成本应该按 unit economics 来看:
单任务成本 =
模型输入 token
+ 模型输出 token
+ 推理 / thinking token
+ 内置工具费用(web search、代码执行、文件检索等)
+ 外部基础设施费用(向量库、数据库、容器、队列、存储)
+ 重试与失败请求成本
+ 评估 / 回归测试摊销成本
+ 优先级队列、批处理、缓存等计费系数调整这也是为什么 Agent 的成本监控不能只看"总 token 数"。更有用的指标是:每个成功任务的成本(cost per successful task)、每轮平均 token、重试成本占比、缓存命中率、工具调用成本、模型路由升级率。只有把成本和任务成功率绑定起来,才不会把"省钱"优化成"便宜但没用"。
输入 vs 输出的价格差异
以截至 2026.05 的主流模型定价为参考:
| 模型 | 输入 ($/MTok) | 缓存读取 ($/MTok) | 输出 ($/MTok) | 输出/输入倍数 |
|---|---|---|---|---|
| GPT-5.5 | $5.00 | $0.50 | $30.00 | 6x |
| GPT-5.4 | $2.50 | $0.25 | $15.00 | 6x |
| GPT-5.4 mini | $0.75 | $0.075 | $4.50 | 6x |
| Claude Sonnet 4.6 | $3.00 | $0.30 | $15.00 | 5x |
| Claude Haiku 4.5 | $1.00 | $0.10 | $5.00 | 5x |
| Claude Haiku 3 | $0.25 | $0.03 | $1.25 | 5x |
来源:OpenAI API Pricing(2026.05)[1]、Anthropic Prompt Caching 文档(2026.05)[2]
几个值得注意的结论:
输出 token 是大头。 输出比输入贵 5-6 倍,这意味着如果你的 Agent 产生大量冗长输出,成本会快速膨胀。控制输出长度——明确要求模型简洁输出、使用结构化格式替代叙述性文本——往往比其他优化更有效。
缓存读取是最便宜的输入。 表中"缓存读取"那列是正常输入的 10%,也就是 90% 的折扣。如果你有大量重复的前缀内容,缓存命中能直接把这部分成本打到原来的十分之一。
模型之间差距巨大。 GPT-5.4 mini 的输出 token 价格是 GPT-5.5 的 15%。如果某些子任务不需要最强模型,换一个便宜 10 倍的模型做,这不是降级,是工程上的正确决策。
Agent 系统的成本结构
一个实际的 Agent 在单次任务中可能有多次 LLM 调用——每次规划、每次工具结果处理、每次生成输出,都是一次计费。
任务执行中的 LLM 调用次数(以一个中等复杂度 Coding Agent 为例):
- 初始规划:1 次
- 每次工具调用后的推理:3-8 次
- 代码审查:1-2 次
- 结果汇总:1 次
─────────────────────────────────
总计:约 6-12 次 LLM 调用/任务如果每次调用发送的 context 是 12000 token,加上 800 token 的输出,用 GPT-5.4($2.50/MTok 输入 + $15.00/MTok 输出):
单次调用成本 = 12000 × $2.50/10^6 + 800 × $15.00/10^6
= $0.030 + $0.012
= $0.042
一个 10 步任务 ≈ $0.42
日活 1000 次任务 ≈ $420/天 ≈ $12,600/月这还没考虑 context 随任务进行而增长——越到后面的步骤,输入 token 越多。
Streaming:降低感知延迟,不降低成本
Streaming(流式输出)让用户看到模型"正在打字",而不是等待全部完成再展示。这对用户体验很重要,但需要明确:Streaming 不降低 token 消耗,它只是改变了结果的交付方式。
对于长任务 Agent,Streaming 更多的意义在于:让用户知道 Agent 还活着、在进展,而不是面对一个空屏等待几分钟。这本身是一种信任建立机制,不是成本优化手段。
19.2 Prompt Caching —— 让重复的 token 只算一次
核心直觉
每次 API 调用,你发给模型的 token 里,有很大一部分是完全一样的——系统提示、工具定义、背景文档。如果每次都让模型从头"读"一遍,这些重复的工作既慢又贵。Prompt Caching 把这些稳定的前缀存下来,后续请求直接复用,跳过重新计算这部分的过程。
类比:你每天早上给助理发邮件,邮件开头都是一段 500 字的背景说明。如果助理每次都从头读,效率很低;如果你说"背景和昨天一样,直接看今天的新内容",就快多了。Prompt Caching 就是这个"背景和昨天一样"的机制。
两家主要供应商的实现方式
OpenAI:自动缓存,无需代码改动
OpenAI 的 Prompt Caching 是全自动的,对所有支持的模型默认开启,不需要在请求里加任何参数 [3]。条件是:prompt 总长度 ≥ 1024 token。
系统会根据 prompt 前缀做哈希路由,把请求发到最近处理过相同前缀的机器上。如果命中缓存,输入 token 成本降至原来的 10%(即 90% 折扣),同时延迟也显著下降。
缓存保留时间:
- 大多数模型:5-10 分钟无活动后失效,最长 1 小时(内存缓存)
- gpt-5.5 等较新模型:默认使用最长 24 小时的扩展缓存,不支持内存缓存模式
- 其他支持扩展缓存的模型:可以通过
prompt_cache_retention显式选择缓存保留策略
如果应用有严格的数据隔离或命中率治理需求,可以设置 prompt_cache_key,把不同租户、用户或工作区的缓存空间隔开。需要注意两点:同一个 cache key 的请求速率过高时,缓存命中率可能下降,经验上应避免同一个 key 超过约 15 RPM;cached tokens 虽然按折扣计费,但仍会计入速率限制。响应的 usage 字段会包含 cached_tokens 指标,可以用来监控缓存命中情况:
import openai
client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-5.4",
prompt_cache_retention="24h",
prompt_cache_key=f"tenant:{tenant_id}:agent:{agent_id}",
messages=[
{"role": "system", "content": LONG_SYSTEM_PROMPT}, # 稳定前缀
{"role": "user", "content": user_message}, # 变化部分
]
)
# 查看缓存命中
cached = response.usage.prompt_tokens_details.cached_tokens
total_input = response.usage.prompt_tokens
print(f"缓存命中率: {cached / total_input * 100:.1f}%")Anthropic:显式标记,更精细的控制
Anthropic 的方案需要在请求里显式打上 cache_control 标记,标明哪些内容应该被缓存 [2]。这比 OpenAI 的自动方案多一步配置,但也提供了更精细的控制。
截至 2026.05,Anthropic 提供两种方式:
- 自动缓存(Automatic Caching):在请求顶层加
cache_control,系统自动在最后一个可缓存块上打标记,适合多轮对话 - 显式断点(Explicit Breakpoints):在具体 content block 上加
cache_control,适合需要细粒度控制的场景
价格结构:
- 缓存写入(5 分钟 TTL):基础输入价格的 1.25 倍(额外 25% 用于写缓存)
- 缓存写入(1 小时 TTL):基础输入价格的 2 倍
- 缓存读取:基础输入价格的 0.1 倍(90% 折扣)
最小缓存长度因模型而异:Claude Sonnet 4.6 要求 2048 token,Claude Haiku 4.5 要求 4096 token(截至 2026.05 以官方文档为准 [2])。
import anthropic
client = anthropic.Anthropic()
# 方式一:自动缓存(推荐用于多轮对话)
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
cache_control={"type": "ephemeral"}, # 顶层自动缓存
system="你是一个专业的代码审查助手...", # 长系统提示
messages=[
{"role": "user", "content": "请审查这段代码"},
]
)
# 方式二:显式断点(精细控制)
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system=[
{
"type": "text",
"text": TOOLS_AND_INSTRUCTIONS, # 工具定义 + 指令(稳定,缓存)
"cache_control": {"type": "ephemeral"},
}
],
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": LONG_DOCUMENT, # 文档内容(稳定,缓存)
"cache_control": {"type": "ephemeral"},
},
{"type": "text", "text": user_question}, # 用户问题(变化,不缓存)
],
}
],
)
# 查看缓存状况
usage = response.usage
print(f"缓存写入: {usage.cache_creation_input_tokens}")
print(f"缓存命中: {usage.cache_read_input_tokens}")什么内容该缓存,什么不该
缓存带来最大收益的内容:
- System Prompt:每次调用都发送,几乎不变,是最优先的缓存对象
- 工具定义:工具列表在一个 Agent 的生命周期内基本稳定
- 背景文档:RAG 检索到的文档、代码库快照、长篇参考材料
- Few-shot 示例:示例不变,每次都发送,适合缓存
不适合缓存的内容:
- 每次都不同的用户输入
- 包含时间戳、请求 ID 的动态字段
- 频繁变化的状态信息
一个容易犯的错误: 把 cache_control 打在包含时间戳或每次变化字段的 block 上。缓存命中要求前缀完全一致,如果标记位置上的内容每次都变,永远不会命中。应该把标记打在最后一个稳定不变的 block 上。
提升缓存命中率的实践
结构原则:稳定内容前置,变化内容后置
┌─────────────────────────────────────┐
│ 工具定义(稳定) ← 打缓存标记 │
│ System Prompt(稳定) ← 打缓存标记 │
│ 背景文档(较稳定) ← 打缓存标记 │
│ 对话历史(增长) │
│ 当前用户消息(每次不同) │
└─────────────────────────────────────┘对于使用 Anthropic API 的 Agent,还可以做 Cache Pre-warming(缓存预热):在用户请求到达之前,先发一个极短输出请求来触发缓存写入,这样第一个真实用户请求就能命中缓存,减少首次请求的延迟 [2]。预热不是免费的,它仍会产生一次请求和少量输出 token 成本,因此只适合稳定前缀足够长、后续复用概率足够高的场景。
# 预热缓存:应用启动时调用,用户请求到达前完成
def prewarm_cache():
client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1, # Messages API 最小值为 1,用极短输出触发缓存写入
system=[
{
"type": "text",
"text": SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"},
}
],
messages=[{"role": "user", "content": "warmup"}],
)
# stop_reason 通常会是 "max_tokens",但仍会产生少量输出 token 成本实际效果:在内容稳定、请求量大的场景(比如处理同一批文档的多个问题),缓存命中率可以达到 70-90%,对应输入成本降低接近同等比例。
19.3 模型路由 —— 用对的工具做对的事
核心直觉
不同任务对智能的需求不一样。让 GPT-5.5 帮你做意图分类,就像用航空发动机开除草机——能干,但浪费。模型路由(Model Routing)的核心思想是:根据任务的复杂度,自动选择性价比最合适的模型。
这不是降低质量,而是精确匹配。用小模型做它擅长且足够的事,把大模型的算力留给真正需要深度推理的地方。
任务-模型匹配矩阵
不同任务对模型能力的要求是不同的:
| 任务类型 | 典型例子 | 建议模型档位 | 理由 |
|---|---|---|---|
| 意图分类 | "这是查询还是操作?" | 小模型 / 微调模型 | 分类任务,答案空间小 |
| 参数提取 | 从文本提取日期、金额 | 小模型 | 结构化提取,规则明确 |
| 简单问答 | FAQ 回答、政策查询 | 小模型 | 单次检索+生成 |
| 工具选择 | 从 5 个工具里选一个 | 中型模型 | 需要推理但不复杂 |
| 多步规划 | 分解一个复杂任务 | 大模型 / 推理模型 | 需要深度推理 |
| 代码生成 | 写一个完整功能 | 大模型 | 需要理解复杂约束 |
| 结果审查 | 检查输出是否符合要求 | 中/大模型 | 需要理解质量标准 |
| 摘要压缩 | 压缩对话历史 | 小/中模型 | 提取信息,不需要创造 |
Router 设计模式
一个简单的基于规则的路由器:
from enum import Enum
from dataclasses import dataclass
class ModelTier(Enum):
SMALL = "small"
MEDIUM = "medium"
LARGE = "large"
REASONING = "reasoning"
# 模型映射(截至2026.05,实际使用时请查阅最新官方文档确认可用性)
MODEL_MAP = {
ModelTier.SMALL: "gpt-5.4-mini", # 低成本,高吞吐
ModelTier.MEDIUM: "gpt-5.4", # 均衡
ModelTier.LARGE: "gpt-5.5", # 高能力
ModelTier.REASONING: "o4-mini", # 推理任务
}
@dataclass
class TaskContext:
task_type: str
estimated_complexity: int # 1-5
requires_reasoning: bool
context_length: int
def route_model(task: TaskContext) -> str:
"""基于任务特征选择合适的模型"""
# 需要深度推理的任务
if task.requires_reasoning and task.estimated_complexity >= 4:
return MODEL_MAP[ModelTier.REASONING]
# 高复杂度任务
if task.estimated_complexity >= 4:
return MODEL_MAP[ModelTier.LARGE]
# 简单分类/提取任务
if task.task_type in ("intent_classification", "parameter_extraction") \
and task.estimated_complexity <= 2:
return MODEL_MAP[ModelTier.SMALL]
# 其余情况用中型模型
return MODEL_MAP[ModelTier.MEDIUM]更复杂的做法是用一个轻量模型做任务复杂度估算,然后把请求路由到合适的模型。不过这会增加一次额外调用,需要权衡路由成本和收益:只有当被路由任务的预期节省明显大于路由本身的开销时,才值得。
级联路由(Cascade Routing)
一种务实的路由策略:先用小模型尝试,如果结果不满足质量要求,再升级到大模型:
async def cascade_completion(prompt: str, quality_checker) -> str:
"""先用小模型,不满足质量要求再升级"""
# 第一次:小模型
response = await call_model("gpt-5.4-mini", prompt)
if quality_checker(response):
return response # 小模型够用,直接返回
# 升级到大模型
return await call_model("gpt-5.5", prompt)这个模式对于那些"大部分情况小模型够用,偶尔需要大模型"的场景特别有效——比如客服回复的草稿生成。社区里有开发者反馈,在实际产品中对 FAQ 类问题使用级联路由后,约 80% 的请求落在小模型上,总成本降低了约 60%(来自 HN 上 "Ask HN: LLM cost optimization" 线程,2024 年,具体数字因业务场景差异很大,仅供参考)。
级联路由必须有质量闭环,否则它会变成新的黑盒。至少要记录:
- 每类任务的小模型首轮通过率
- 升级到大模型的比例和原因
- 小模型误放行率(看似通过但后续失败)
- 每条路径的 cost per successful task
- 路由决策、模型版本、quality_checker 结果和最终 Outcome
真正上线前,还要用第 18 章的回归 eval 套件验证:模型降档之后,核心任务成功率没有显著下降。路由器不是省钱开关,而是一个需要被评估、监控和回滚的生产组件。
常见误区:盲目堆最强模型
一个在社区中反复出现的教训是:在 Agent 系统里,用最强模型做所有步骤通常不是最优策略。原因有两个:
- 最强模型的输出 token 最贵,但某些步骤只需要简单决策
- 推理模型(如 o 系列)在简单任务上有不必要的 thinking overhead,反而更慢更贵
实践上,推理模型更适合用于规划阶段(复杂任务分解、策略制定),而执行阶段的工具调用则用普通模型更高效。
19.4 工具调用优化 —— 别让工具等工具
并行工具调用
最容易被忽视的优化之一:无依赖关系的工具调用,应该并行执行而不是顺序执行。
举一个具体例子:Agent 需要同时查询天气、查询日历、查询用户偏好,这三个操作没有依赖关系。顺序执行需要 300ms × 3 = 900ms;并行执行只需要 max(300ms, 280ms, 250ms) ≈ 300ms。
import asyncio
async def parallel_tool_calls(tool_calls: list[dict]) -> list[dict]:
"""并行执行无依赖关系的工具调用"""
tasks = [execute_tool(call) for call in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
{"tool_call_id": call["id"], "result": result}
for call, result in zip(tool_calls, results)
]
# LLM 返回多个工具调用时:
# tool_calls = [
# {"id": "1", "name": "get_weather", "args": {"city": "Shanghai"}},
# {"id": "2", "name": "get_calendar", "args": {"date": "today"}},
# {"id": "3", "name": "get_preferences", "args": {"user_id": "u123"}},
# ]
# 并行执行后把所有结果一起发给 LLM主流 LLM API(OpenAI、Anthropic)在返回 tool_calls 时如果有多个工具,明确表示它们可以并行执行——这不是实现细节,是 API 设计语义。但很多 Agent 框架的默认实现是顺序执行,需要手动开启并行。
工具结果缓存
对于相同查询不会随时间快速变化的工具(数据库查询、文档检索、配置读取),可以在工具层做结果缓存:
import hashlib
import time
from functools import wraps
def tool_cache(ttl_seconds: int = 300):
"""工具结果缓存装饰器"""
cache = {}
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键:生产环境必须包含租户、权限和数据版本边界
cache_scope = {
"tenant_id": kwargs.get("tenant_id"),
"user_scope": kwargs.get("user_scope"),
"permission_version": kwargs.get("permission_version"),
"data_version": kwargs.get("data_version"),
"args": args,
"kwargs": sorted(kwargs.items()),
}
key = hashlib.sha256(str(cache_scope).encode()).hexdigest()
# 检查缓存
if key in cache:
result, timestamp = cache[key]
if time.time() - timestamp < ttl_seconds:
return result
# 执行工具,存入缓存
result = await func(*args, **kwargs)
cache[key] = (result, time.time())
return result
return wrapper
return decorator
@tool_cache(ttl_seconds=60) # 1 分钟内相同查询复用结果
async def search_knowledge_base(
query: str,
tenant_id: str,
user_scope: str,
permission_version: str,
data_version: str,
) -> list[str]:
"""知识库检索工具"""
...注意:工具缓存需要考虑缓存键的设计和失效策略。对于实时性要求高的工具(股价、实时状态),不应该缓存或 TTL 应该很短;对于包含敏感数据、用户私有数据、权限过滤结果的工具,默认不缓存,除非缓存 key 明确包含租户、用户/角色范围、权限版本和数据版本。高并发场景还要防止 cache stampede:同一个 key 失效时,只允许一个请求回源,其他请求等待或读取短暂过期的旧值。
批量处理
有些场景下,Agent 需要对大量相似项目做同样的操作(比如批量分类 100 篇文章的主题)。这时候可以用批处理 API 替代逐个调用:
OpenAI Batch API:提交 .jsonl 文件,24 小时内完成,成本降低 50%,非常适合非实时的大批量任务 [4]。
Anthropic Message Batches:类似机制,同样提供成本折扣。
对于 Agent 里的评估步骤、后台数据处理、离线报告生成,批量处理是成本效益最高的方案之一。
# OpenAI Batch API 示例:批量对文章做主题分类
import json
requests = [
{
"custom_id": f"article-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-5.4-mini",
"messages": [
{"role": "system", "content": "请对以下文章进行主题分类,返回 JSON"},
{"role": "user", "content": article}
],
"max_tokens": 100,
}
}
for i, article in enumerate(articles)
]
# 写入 jsonl 文件,上传,创建 batch
with open("batch_input.jsonl", "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")19.5 并发控制与速率限制
为什么需要主动做速率控制
LLM API 有 Token Per Minute(TPM)和 Request Per Minute(RPM)限制。当 Agent 系统在高并发下运行时,如果不加控制,很容易触发限流(HTTP 429),然后大量请求失败或重试,这既浪费成本,又可能造成雪崩效应。
主动管理速率,比被动等待 API 返回 429 再处理,要优雅得多。
令牌桶限流
令牌桶算法(Token Bucket)是控制请求速率的经典方案:桶里有一定数量的令牌,每次请求消耗令牌,令牌以固定速率补充,桶满为止。
import asyncio
import time
class TokenBucket:
"""基于 token 消耗的速率限制器"""
def __init__(self, rate: int, capacity: int):
"""
rate: 每秒补充的 token 数(对应 API 的 TPM/60)
capacity: 桶容量(最大突发量)
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> None:
"""等待直到有足够的令牌"""
while True:
async with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return
# 计算需要等待的时间
wait_time = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait_time)
def _refill(self):
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_refill = now
# 使用:对于 TPM 限制为 100,000 的 API
# rate = 100000 / 60 ≈ 1666 tokens/second
limiter = TokenBucket(rate=1666, capacity=10000)
async def call_llm_with_rate_limit(prompt: str, estimated_tokens: int):
await limiter.acquire(estimated_tokens)
return await call_llm(prompt)这个例子只演示 TPM 维度。生产环境通常还需要同时限制 RPM、并发请求数和单租户预算,并在请求完成后用真实 usage 回填修正估算值。cached tokens 虽然更便宜,但仍可能计入供应商的速率限制;输出 token 也经常是限流和成本失控的主要来源。
优雅降级
在限流或 API 不可用时,Agent 系统需要有明确的降级策略,而不是直接报错或无限重试:
降级时有几个可选策略,按优先级排序:
- 切换到备用模型:同等能力的不同供应商
- 返回缓存的旧结果(如果业务允许轻微过期数据)
- 降级到规则系统(如果任务有非 LLM 的替代方案)
- 进入等待队列,稍后重试
- 返回人工处理,通知操作人员
19.6 延迟优化策略 —— 让用户感觉快
首 token 延迟 vs 总延迟
对 Agent 系统来说,延迟有两个不同的含义:
- 首 token 延迟(TTFT,Time To First Token):从发出请求到收到第一个 token 的时间。对于有流式展示的场景,这决定了用户感知到"它开始回复了"的时间。
- 总延迟(Total Latency):从发出请求到收到完整响应的时间。对于需要完整结果才能继续的步骤,这才是真正的瓶颈。
在多步骤 Agent 中,这两者的优化策略不同:
- 减少 TTFT:Prompt Caching(减少预填充时间)、选更快的模型
- 减少总延迟:并行化(工具并发、子任务并发)、减少输出 token 数
思维链 vs 直接输出的延迟权衡
推理模型(如 o 系列)在回答前会做内部推理,这会增加延迟。对于需要深度思考的复杂规划任务,这个延迟是值得的;但对于简单的路由决策或参数提取,用推理模型反而是负优化。
判断依据:如果一个任务你认为"有经验的工程师看一眼就知道答案",就不需要推理模型。如果是"需要思考几步才能确定策略",推理模型才有价值。
预取策略
在用户操作的空档期,预先准备可能需要的上下文,减少实际需要时的等待:
import asyncio
async def agent_with_prefetch(user_input: str):
"""在处理用户输入的同时,预取可能需要的上下文"""
# 同时启动:意图分析 + 预取可能需要的数据
intent_task = asyncio.create_task(analyze_intent(user_input))
# 根据历史模式,预取大概率会用到的数据
prefetch_tasks = [
asyncio.create_task(prefetch_user_profile()),
asyncio.create_task(prefetch_recent_context()),
]
# 先等意图分析完成
intent = await intent_task
# 根据意图决定是否需要额外的工具调用
if intent == "calendar_query":
calendar_task = asyncio.create_task(fetch_calendar_data())
# 合并预取结果(此时大概率已完成)
profile, context = await asyncio.gather(*prefetch_tasks)
# 最终生成响应
return await generate_response(intent, profile, context)这个模式在 UX 上的效果很明显:用户感知到的等待时间是"意图分析时间"而不是"意图分析 + 数据获取时间",两者并行执行了。
19.7 Agent 配置管理与迭代发布
为什么 Agent 的配置管理比传统软件更复杂
传统软件发布,代码变了才算发布。Agent 有一个特殊性:改一个 prompt,等于改了一段"代码"。System prompt 调整、工具描述变化、模型版本切换、路由规则修改——这些都会影响 Agent 的行为,但都没有代码 diff。
如果不把这些变化当成软件发布来管理,你会遇到以下困境:
- 在 prompt 里直接改了两行,上线后行为变了,但无法快速回滚
- 多个工程师同时修改 prompt,互相覆盖
- 不知道哪次"优化"导致了性能退化
解决方案是把 Agent 配置版本化:用版本控制管理配置,用发布流程管理变更。
Agent 配置的组成
一个完整的 Agent 配置应该包含:
# agent_config.yaml
version: "2.3.1"
created_at: "2026-04-15"
model:
primary: "gpt-5.4"
fallback: "gpt-5.4-mini"
reasoning_tasks: "o4-mini"
system_prompt: |
你是一个...(完整的 system prompt)
tools:
- name: "search_knowledge_base"
enabled: true
max_results: 5
- name: "execute_code"
enabled: true
timeout_seconds: 30
routing:
complexity_threshold: 3
use_cascade: true
guardrails:
max_turns: 20
max_tokens_per_turn: 4000
sensitive_topics: ["financial_advice", "medical_diagnosis"]把这个 YAML 文件放进 Git,每次修改都有 PR、都有 review、都有 commit 记录。这不是形式主义,是在说"谁在什么时候做了什么改动,以及为什么"。
A/B 测试
当你有两个 prompt 版本、两个模型选项,不确定哪个更好时,A/B 测试是最直接的验证方式:
import hashlib
import random
class AgentConfigRouter:
"""在不同配置版本之间分流请求"""
def __init__(self, experiments: list[dict]):
"""
experiments: [
{"config": config_v1, "weight": 0.5, "name": "control"},
{"config": config_v2, "weight": 0.5, "name": "treatment"},
]
"""
self.experiments = experiments
self._validate_weights()
def _validate_weights(self):
total = sum(e["weight"] for e in self.experiments)
assert abs(total - 1.0) < 1e-6, f"权重之和必须为 1,实际为 {total}"
def get_config(self, user_id: str = None) -> tuple[dict, str]:
"""
返回 (config, experiment_name)
如果提供 user_id,同一用户每次分到同一组(粘性分流)
"""
if user_id:
# 基于用户 ID 的稳定哈希分流,避免 Python hash 随进程重启变化
digest = hashlib.sha256(user_id.encode()).hexdigest()
seed = int(digest[:8], 16) / 0xFFFFFFFF
else:
seed = random.random()
cumulative = 0
for exp in self.experiments:
cumulative += exp["weight"]
if seed < cumulative:
return exp["config"], exp["name"]
return self.experiments[-1]["config"], self.experiments[-1]["name"]
# 记录每次请求使用的实验组,用于后续统计分析
router = AgentConfigRouter([
{"config": config_v1, "weight": 0.5, "name": "control"},
{"config": config_v2, "weight": 0.5, "name": "treatment"},
])
config, experiment = router.get_config(user_id=user_id)
# 在日志/追踪系统中记录 experiment,关联任务成功率和成本A/B 测试的结论需要足够的样本量才有统计意义。对于低频任务,可能需要跑几周才能得到置信度足够的结论。
灰度发布与回滚
不要把新版本直接推给全量用户。灰度发布(Canary Release)让你能在出问题时快速控制影响范围:
发布流程:
1% 流量 → 观察 24h → 无异常
5% 流量 → 观察 24h → 无异常
20% 流量 → 观察 48h → 无异常
100% 流量定义明确的回滚触发条件,而不是靠人工判断:
class DeploymentMonitor:
"""监控新版本指标,自动触发回滚"""
ROLLBACK_THRESHOLDS = {
"error_rate": 0.05, # 错误率超过 5% 触发回滚
"task_success_rate_drop": 0.1, # 任务成功率下降超过 10%
"p95_latency_increase": 2.0, # P95 延迟增加超过 2 倍
"cost_per_task_increase": 1.5, # 每任务成本增加超过 50%
}
def should_rollback(self, current_metrics: dict, baseline_metrics: dict) -> bool:
error_rate = current_metrics.get("error_rate", 0)
if error_rate > self.ROLLBACK_THRESHOLDS["error_rate"]:
return True
success_drop = (
baseline_metrics["task_success_rate"] - current_metrics["task_success_rate"]
)
if success_drop > self.ROLLBACK_THRESHOLDS["task_success_rate_drop"]:
return True
return FalseFeature Flag:把配置变更当成可控的软件发布
Feature Flag 让你在不修改代码的情况下,动态控制 Agent 的行为:
from typing import Any
class AgentFeatureFlags:
"""Agent 行为的动态开关"""
def __init__(self, flag_service):
self.flags = flag_service
def get_model(self, context: dict) -> str:
"""根据 Feature Flag 决定使用哪个模型"""
if self.flags.is_enabled("use_gpt5_5", context):
return "gpt-5.5"
return "gpt-5.4"
def get_max_reasoning_depth(self) -> int:
"""ReAct 最大推理步骤数"""
return self.flags.get_int("max_reasoning_steps", default=10)
def is_tool_enabled(self, tool_name: str, user_id: str) -> bool:
"""某个工具是否对特定用户开启"""
return self.flags.is_enabled(f"tool_{tool_name}", {"user_id": user_id})Feature Flag 和 A/B 测试的区别:A/B 测试目的是收集数据做决策,Flag 是已经决策后的渐进式开关。实践中两者经常配合使用:先 A/B 测试确认新行为更好,再通过 Flag 控制逐步放量。
完整的发布工作流
这套流程的关键是:把 prompt 变更和代码变更用同等严肃的流程对待。Eval Gate 是最重要的护栏——在上一章(第18章)里讲到的回归评估套件,就是在这里发挥作用的。一次 prompt 修改让某类任务成功率下降 5%,在 CI 里就应该被发现,而不是等到生产环境出问题。
面试高频题
Q:如何将 Agent 的 API 调用成本降低 50% 同时保持质量?
参考回答框架:
成本降低 50% 不是一个单一手段能做到的,而是多个优化叠加的结果。优先级从高到低:
第一优先:Prompt Caching(收益最大)
分析你的 Agent 每次请求中有多少 token 是重复发送的——system prompt、工具定义、背景文档。这部分通常占总输入的 60-80%。配置好 Prompt Caching 后,这些 token 的读取成本降至原来的 10%。如果这些内容占到输入 token 的 70%,仅此一项就能降低总输入成本约 63%。
第二优先:模型路由
评估每一个 Agent 步骤真正需要什么能力。意图分类、参数提取、简单路由这类任务,用小模型(价格通常是顶级模型的 1/10 甚至更低)完全够用。只在需要复杂推理或高质量生成的步骤上用大模型。
第三优先:并行工具调用
确认你的框架是否在并行执行无依赖的工具调用。这不降成本,但降低延迟,从而可以设置更短的超时,减少因超时重试带来的额外成本。
第四优先:批处理 API
对于非实时任务(后台评估、数据处理、报告生成),使用 Batch API 获得 50% 的折扣。
加分点(展现深度):
- 要验证优化效果,需要先建立成本监控——按任务类型追踪 input/output token 分布,找到成本最高的任务类型优先优化
- 成本优化和质量之间存在真实的权衡,不能只看成本数字。需要配合 Eval 套件,确认模型降档或 prompt 精简后,核心任务成功率没有下降
- 告警很重要:当某个优化手段(比如缓存命中率下降)失效时,成本会反弹,需要及时发现
参考资料
[1] OpenAI API Pricing - OpenAI (https://openai.com/api/pricing/)
[2] Prompt Caching - Anthropic API Documentation (https://platform.claude.com/docs/en/docs/build-with-claude/prompt-caching)
[3] Prompt Caching - OpenAI Developer Documentation (https://developers.openai.com/api/docs/guides/prompt-caching)
[4] Batch API - OpenAI Developer Documentation (https://developers.openai.com/api/docs/guides/batch)