Skip to content

第19章:成本优化与性能调优

一个 Agent 系统在 demo 阶段可能每天消耗几十美元,到了日活万级的生产环境,月账单轻松突破数万美元。不是模型变贵了,是架构没变。

这不是夸张。一个典型的生产级 Agent——系统提示 4000 token、工具定义 2000 token、对话历史 6000 token、每次响应 800 token——如果每次调用都从零开始计算,每次请求的成本比实际必要的高出好几倍。规模一旦上来,这个差距就变成了真金白银。

成本优化和性能调优不是两个独立话题。在 LLM 场景里,它们深度耦合:更快的响应通常意味着更少的等待时间,更少的 token 消耗通常意味着更快的处理速度。减少不必要的工作,是同时提升速度和降低成本的核心思路。

这一章从 Token 经济学讲起,系统介绍 Agent 系统里最有效的几类优化手段:Prompt Caching、模型路由、工具调用优化、并发控制,以及如何用配置管理让这些优化可迭代、可回滚。

19.1 Token 经济学 —— 钱是怎么烧掉的

核心直觉

每次调用 LLM API,你都在为两件事付钱:你发出去的 token(输入)和模型生成的 token(输出)。这两类 token 的价格不一样,而且差距相当大——输出 token 通常比输入贵得多。理解这个不对称,是成本优化的起点。

但生产级 Agent 的账单不只来自这两项。完整的单位任务成本应该按 unit economics 来看:

text
单任务成本 =
  模型输入 token
+ 模型输出 token
+ 推理 / thinking token
+ 内置工具费用(web search、代码执行、文件检索等)
+ 外部基础设施费用(向量库、数据库、容器、队列、存储)
+ 重试与失败请求成本
+ 评估 / 回归测试摊销成本
+ 优先级队列、批处理、缓存等计费系数调整

这也是为什么 Agent 的成本监控不能只看"总 token 数"。更有用的指标是:每个成功任务的成本(cost per successful task)、每轮平均 token、重试成本占比、缓存命中率、工具调用成本、模型路由升级率。只有把成本和任务成功率绑定起来,才不会把"省钱"优化成"便宜但没用"。

输入 vs 输出的价格差异

以截至 2026.05 的主流模型定价为参考:

模型输入 ($/MTok)缓存读取 ($/MTok)输出 ($/MTok)输出/输入倍数
GPT-5.5$5.00$0.50$30.006x
GPT-5.4$2.50$0.25$15.006x
GPT-5.4 mini$0.75$0.075$4.506x
Claude Sonnet 4.6$3.00$0.30$15.005x
Claude Haiku 4.5$1.00$0.10$5.005x
Claude Haiku 3$0.25$0.03$1.255x

来源:OpenAI API Pricing(2026.05)[1]、Anthropic Prompt Caching 文档(2026.05)[2]

几个值得注意的结论:

输出 token 是大头。 输出比输入贵 5-6 倍,这意味着如果你的 Agent 产生大量冗长输出,成本会快速膨胀。控制输出长度——明确要求模型简洁输出、使用结构化格式替代叙述性文本——往往比其他优化更有效。

缓存读取是最便宜的输入。 表中"缓存读取"那列是正常输入的 10%,也就是 90% 的折扣。如果你有大量重复的前缀内容,缓存命中能直接把这部分成本打到原来的十分之一。

模型之间差距巨大。 GPT-5.4 mini 的输出 token 价格是 GPT-5.5 的 15%。如果某些子任务不需要最强模型,换一个便宜 10 倍的模型做,这不是降级,是工程上的正确决策。

Agent 系统的成本结构

一个实际的 Agent 在单次任务中可能有多次 LLM 调用——每次规划、每次工具结果处理、每次生成输出,都是一次计费。

任务执行中的 LLM 调用次数(以一个中等复杂度 Coding Agent 为例):
- 初始规划:1 次
- 每次工具调用后的推理:3-8 次
- 代码审查:1-2 次
- 结果汇总:1 次
─────────────────────────────────
总计:约 6-12 次 LLM 调用/任务

如果每次调用发送的 context 是 12000 token,加上 800 token 的输出,用 GPT-5.4($2.50/MTok 输入 + $15.00/MTok 输出):

单次调用成本 = 12000 × $2.50/10^6 + 800 × $15.00/10^6
            = $0.030 + $0.012
            = $0.042

一个 10 步任务 ≈ $0.42
日活 1000 次任务 ≈ $420/天 ≈ $12,600/月

这还没考虑 context 随任务进行而增长——越到后面的步骤,输入 token 越多。

Streaming:降低感知延迟,不降低成本

Streaming(流式输出)让用户看到模型"正在打字",而不是等待全部完成再展示。这对用户体验很重要,但需要明确:Streaming 不降低 token 消耗,它只是改变了结果的交付方式。

对于长任务 Agent,Streaming 更多的意义在于:让用户知道 Agent 还活着、在进展,而不是面对一个空屏等待几分钟。这本身是一种信任建立机制,不是成本优化手段。

19.2 Prompt Caching —— 让重复的 token 只算一次

核心直觉

每次 API 调用,你发给模型的 token 里,有很大一部分是完全一样的——系统提示、工具定义、背景文档。如果每次都让模型从头"读"一遍,这些重复的工作既慢又贵。Prompt Caching 把这些稳定的前缀存下来,后续请求直接复用,跳过重新计算这部分的过程。

类比:你每天早上给助理发邮件,邮件开头都是一段 500 字的背景说明。如果助理每次都从头读,效率很低;如果你说"背景和昨天一样,直接看今天的新内容",就快多了。Prompt Caching 就是这个"背景和昨天一样"的机制。

两家主要供应商的实现方式

OpenAI:自动缓存,无需代码改动

OpenAI 的 Prompt Caching 是全自动的,对所有支持的模型默认开启,不需要在请求里加任何参数 [3]。条件是:prompt 总长度 ≥ 1024 token。

系统会根据 prompt 前缀做哈希路由,把请求发到最近处理过相同前缀的机器上。如果命中缓存,输入 token 成本降至原来的 10%(即 90% 折扣),同时延迟也显著下降。

缓存保留时间:

  • 大多数模型:5-10 分钟无活动后失效,最长 1 小时(内存缓存)
  • gpt-5.5 等较新模型:默认使用最长 24 小时的扩展缓存,不支持内存缓存模式
  • 其他支持扩展缓存的模型:可以通过 prompt_cache_retention 显式选择缓存保留策略

如果应用有严格的数据隔离或命中率治理需求,可以设置 prompt_cache_key,把不同租户、用户或工作区的缓存空间隔开。需要注意两点:同一个 cache key 的请求速率过高时,缓存命中率可能下降,经验上应避免同一个 key 超过约 15 RPM;cached tokens 虽然按折扣计费,但仍会计入速率限制。响应的 usage 字段会包含 cached_tokens 指标,可以用来监控缓存命中情况:

python
import openai

client = openai.OpenAI()
response = client.chat.completions.create(
    model="gpt-5.4",
    prompt_cache_retention="24h",
    prompt_cache_key=f"tenant:{tenant_id}:agent:{agent_id}",
    messages=[
        {"role": "system", "content": LONG_SYSTEM_PROMPT},  # 稳定前缀
        {"role": "user", "content": user_message},           # 变化部分
    ]
)

# 查看缓存命中
cached = response.usage.prompt_tokens_details.cached_tokens
total_input = response.usage.prompt_tokens
print(f"缓存命中率: {cached / total_input * 100:.1f}%")

Anthropic:显式标记,更精细的控制

Anthropic 的方案需要在请求里显式打上 cache_control 标记,标明哪些内容应该被缓存 [2]。这比 OpenAI 的自动方案多一步配置,但也提供了更精细的控制。

截至 2026.05,Anthropic 提供两种方式:

  1. 自动缓存(Automatic Caching):在请求顶层加 cache_control,系统自动在最后一个可缓存块上打标记,适合多轮对话
  2. 显式断点(Explicit Breakpoints):在具体 content block 上加 cache_control,适合需要细粒度控制的场景

价格结构:

  • 缓存写入(5 分钟 TTL):基础输入价格的 1.25 倍(额外 25% 用于写缓存)
  • 缓存写入(1 小时 TTL):基础输入价格的 2 倍
  • 缓存读取:基础输入价格的 0.1 倍(90% 折扣)

最小缓存长度因模型而异:Claude Sonnet 4.6 要求 2048 token,Claude Haiku 4.5 要求 4096 token(截至 2026.05 以官方文档为准 [2])。

python
import anthropic

client = anthropic.Anthropic()

# 方式一:自动缓存(推荐用于多轮对话)
response = client.messages.create(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    cache_control={"type": "ephemeral"},  # 顶层自动缓存
    system="你是一个专业的代码审查助手...",  # 长系统提示
    messages=[
        {"role": "user", "content": "请审查这段代码"},
    ]
)

# 方式二:显式断点(精细控制)
response = client.messages.create(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    system=[
        {
            "type": "text",
            "text": TOOLS_AND_INSTRUCTIONS,  # 工具定义 + 指令(稳定,缓存)
            "cache_control": {"type": "ephemeral"},
        }
    ],
    messages=[
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": LONG_DOCUMENT,  # 文档内容(稳定,缓存)
                    "cache_control": {"type": "ephemeral"},
                },
                {"type": "text", "text": user_question},  # 用户问题(变化,不缓存)
            ],
        }
    ],
)

# 查看缓存状况
usage = response.usage
print(f"缓存写入: {usage.cache_creation_input_tokens}")
print(f"缓存命中: {usage.cache_read_input_tokens}")

什么内容该缓存,什么不该

缓存带来最大收益的内容:

  • System Prompt:每次调用都发送,几乎不变,是最优先的缓存对象
  • 工具定义:工具列表在一个 Agent 的生命周期内基本稳定
  • 背景文档:RAG 检索到的文档、代码库快照、长篇参考材料
  • Few-shot 示例:示例不变,每次都发送,适合缓存

不适合缓存的内容:

  • 每次都不同的用户输入
  • 包含时间戳、请求 ID 的动态字段
  • 频繁变化的状态信息

一个容易犯的错误:cache_control 打在包含时间戳或每次变化字段的 block 上。缓存命中要求前缀完全一致,如果标记位置上的内容每次都变,永远不会命中。应该把标记打在最后一个稳定不变的 block 上。

提升缓存命中率的实践

结构原则:稳定内容前置,变化内容后置

┌─────────────────────────────────────┐
│  工具定义(稳定)        ← 打缓存标记 │
│  System Prompt(稳定)  ← 打缓存标记 │
│  背景文档(较稳定)     ← 打缓存标记 │
│  对话历史(增长)                     │
│  当前用户消息(每次不同)             │
└─────────────────────────────────────┘

对于使用 Anthropic API 的 Agent,还可以做 Cache Pre-warming(缓存预热):在用户请求到达之前,先发一个极短输出请求来触发缓存写入,这样第一个真实用户请求就能命中缓存,减少首次请求的延迟 [2]。预热不是免费的,它仍会产生一次请求和少量输出 token 成本,因此只适合稳定前缀足够长、后续复用概率足够高的场景。

python
# 预热缓存:应用启动时调用,用户请求到达前完成
def prewarm_cache():
    client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1,          # Messages API 最小值为 1,用极短输出触发缓存写入
        system=[
            {
                "type": "text",
                "text": SYSTEM_PROMPT,
                "cache_control": {"type": "ephemeral"},
            }
        ],
        messages=[{"role": "user", "content": "warmup"}],
    )
    # stop_reason 通常会是 "max_tokens",但仍会产生少量输出 token 成本

实际效果:在内容稳定、请求量大的场景(比如处理同一批文档的多个问题),缓存命中率可以达到 70-90%,对应输入成本降低接近同等比例。

19.3 模型路由 —— 用对的工具做对的事

核心直觉

不同任务对智能的需求不一样。让 GPT-5.5 帮你做意图分类,就像用航空发动机开除草机——能干,但浪费。模型路由(Model Routing)的核心思想是:根据任务的复杂度,自动选择性价比最合适的模型。

这不是降低质量,而是精确匹配。用小模型做它擅长且足够的事,把大模型的算力留给真正需要深度推理的地方。

任务-模型匹配矩阵

不同任务对模型能力的要求是不同的:

任务类型典型例子建议模型档位理由
意图分类"这是查询还是操作?"小模型 / 微调模型分类任务,答案空间小
参数提取从文本提取日期、金额小模型结构化提取,规则明确
简单问答FAQ 回答、政策查询小模型单次检索+生成
工具选择从 5 个工具里选一个中型模型需要推理但不复杂
多步规划分解一个复杂任务大模型 / 推理模型需要深度推理
代码生成写一个完整功能大模型需要理解复杂约束
结果审查检查输出是否符合要求中/大模型需要理解质量标准
摘要压缩压缩对话历史小/中模型提取信息,不需要创造

Router 设计模式

一个简单的基于规则的路由器:

python
from enum import Enum
from dataclasses import dataclass

class ModelTier(Enum):
    SMALL = "small"
    MEDIUM = "medium"
    LARGE = "large"
    REASONING = "reasoning"

# 模型映射(截至2026.05,实际使用时请查阅最新官方文档确认可用性)
MODEL_MAP = {
    ModelTier.SMALL: "gpt-5.4-mini",       # 低成本,高吞吐
    ModelTier.MEDIUM: "gpt-5.4",            # 均衡
    ModelTier.LARGE: "gpt-5.5",             # 高能力
    ModelTier.REASONING: "o4-mini",          # 推理任务
}

@dataclass
class TaskContext:
    task_type: str
    estimated_complexity: int  # 1-5
    requires_reasoning: bool
    context_length: int

def route_model(task: TaskContext) -> str:
    """基于任务特征选择合适的模型"""
    # 需要深度推理的任务
    if task.requires_reasoning and task.estimated_complexity >= 4:
        return MODEL_MAP[ModelTier.REASONING]
    
    # 高复杂度任务
    if task.estimated_complexity >= 4:
        return MODEL_MAP[ModelTier.LARGE]
    
    # 简单分类/提取任务
    if task.task_type in ("intent_classification", "parameter_extraction") \
       and task.estimated_complexity <= 2:
        return MODEL_MAP[ModelTier.SMALL]
    
    # 其余情况用中型模型
    return MODEL_MAP[ModelTier.MEDIUM]

更复杂的做法是用一个轻量模型做任务复杂度估算,然后把请求路由到合适的模型。不过这会增加一次额外调用,需要权衡路由成本和收益:只有当被路由任务的预期节省明显大于路由本身的开销时,才值得。

级联路由(Cascade Routing)

一种务实的路由策略:先用小模型尝试,如果结果不满足质量要求,再升级到大模型:

python
async def cascade_completion(prompt: str, quality_checker) -> str:
    """先用小模型,不满足质量要求再升级"""
    # 第一次:小模型
    response = await call_model("gpt-5.4-mini", prompt)
    
    if quality_checker(response):
        return response  # 小模型够用,直接返回
    
    # 升级到大模型
    return await call_model("gpt-5.5", prompt)

这个模式对于那些"大部分情况小模型够用,偶尔需要大模型"的场景特别有效——比如客服回复的草稿生成。社区里有开发者反馈,在实际产品中对 FAQ 类问题使用级联路由后,约 80% 的请求落在小模型上,总成本降低了约 60%(来自 HN 上 "Ask HN: LLM cost optimization" 线程,2024 年,具体数字因业务场景差异很大,仅供参考)。

级联路由必须有质量闭环,否则它会变成新的黑盒。至少要记录:

  • 每类任务的小模型首轮通过率
  • 升级到大模型的比例和原因
  • 小模型误放行率(看似通过但后续失败)
  • 每条路径的 cost per successful task
  • 路由决策、模型版本、quality_checker 结果和最终 Outcome

真正上线前,还要用第 18 章的回归 eval 套件验证:模型降档之后,核心任务成功率没有显著下降。路由器不是省钱开关,而是一个需要被评估、监控和回滚的生产组件。

常见误区:盲目堆最强模型

一个在社区中反复出现的教训是:在 Agent 系统里,用最强模型做所有步骤通常不是最优策略。原因有两个:

  1. 最强模型的输出 token 最贵,但某些步骤只需要简单决策
  2. 推理模型(如 o 系列)在简单任务上有不必要的 thinking overhead,反而更慢更贵

实践上,推理模型更适合用于规划阶段(复杂任务分解、策略制定),而执行阶段的工具调用则用普通模型更高效。

19.4 工具调用优化 —— 别让工具等工具

并行工具调用

最容易被忽视的优化之一:无依赖关系的工具调用,应该并行执行而不是顺序执行。

举一个具体例子:Agent 需要同时查询天气、查询日历、查询用户偏好,这三个操作没有依赖关系。顺序执行需要 300ms × 3 = 900ms;并行执行只需要 max(300ms, 280ms, 250ms) ≈ 300ms。

python
import asyncio

async def parallel_tool_calls(tool_calls: list[dict]) -> list[dict]:
    """并行执行无依赖关系的工具调用"""
    tasks = [execute_tool(call) for call in tool_calls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return [
        {"tool_call_id": call["id"], "result": result}
        for call, result in zip(tool_calls, results)
    ]

# LLM 返回多个工具调用时:
# tool_calls = [
#     {"id": "1", "name": "get_weather", "args": {"city": "Shanghai"}},
#     {"id": "2", "name": "get_calendar", "args": {"date": "today"}},
#     {"id": "3", "name": "get_preferences", "args": {"user_id": "u123"}},
# ]
# 并行执行后把所有结果一起发给 LLM

主流 LLM API(OpenAI、Anthropic)在返回 tool_calls 时如果有多个工具,明确表示它们可以并行执行——这不是实现细节,是 API 设计语义。但很多 Agent 框架的默认实现是顺序执行,需要手动开启并行。

工具结果缓存

对于相同查询不会随时间快速变化的工具(数据库查询、文档检索、配置读取),可以在工具层做结果缓存:

python
import hashlib
import time
from functools import wraps

def tool_cache(ttl_seconds: int = 300):
    """工具结果缓存装饰器"""
    cache = {}
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 生成缓存键:生产环境必须包含租户、权限和数据版本边界
            cache_scope = {
                "tenant_id": kwargs.get("tenant_id"),
                "user_scope": kwargs.get("user_scope"),
                "permission_version": kwargs.get("permission_version"),
                "data_version": kwargs.get("data_version"),
                "args": args,
                "kwargs": sorted(kwargs.items()),
            }
            key = hashlib.sha256(str(cache_scope).encode()).hexdigest()
            
            # 检查缓存
            if key in cache:
                result, timestamp = cache[key]
                if time.time() - timestamp < ttl_seconds:
                    return result
            
            # 执行工具,存入缓存
            result = await func(*args, **kwargs)
            cache[key] = (result, time.time())
            return result
        
        return wrapper
    return decorator

@tool_cache(ttl_seconds=60)  # 1 分钟内相同查询复用结果
async def search_knowledge_base(
    query: str,
    tenant_id: str,
    user_scope: str,
    permission_version: str,
    data_version: str,
) -> list[str]:
    """知识库检索工具"""
    ...

注意:工具缓存需要考虑缓存键的设计和失效策略。对于实时性要求高的工具(股价、实时状态),不应该缓存或 TTL 应该很短;对于包含敏感数据、用户私有数据、权限过滤结果的工具,默认不缓存,除非缓存 key 明确包含租户、用户/角色范围、权限版本和数据版本。高并发场景还要防止 cache stampede:同一个 key 失效时,只允许一个请求回源,其他请求等待或读取短暂过期的旧值。

批量处理

有些场景下,Agent 需要对大量相似项目做同样的操作(比如批量分类 100 篇文章的主题)。这时候可以用批处理 API 替代逐个调用:

OpenAI Batch API:提交 .jsonl 文件,24 小时内完成,成本降低 50%,非常适合非实时的大批量任务 [4]。

Anthropic Message Batches:类似机制,同样提供成本折扣。

对于 Agent 里的评估步骤、后台数据处理、离线报告生成,批量处理是成本效益最高的方案之一。

python
# OpenAI Batch API 示例:批量对文章做主题分类
import json

requests = [
    {
        "custom_id": f"article-{i}",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-5.4-mini",
            "messages": [
                {"role": "system", "content": "请对以下文章进行主题分类,返回 JSON"},
                {"role": "user", "content": article}
            ],
            "max_tokens": 100,
        }
    }
    for i, article in enumerate(articles)
]

# 写入 jsonl 文件,上传,创建 batch
with open("batch_input.jsonl", "w") as f:
    for req in requests:
        f.write(json.dumps(req) + "\n")

19.5 并发控制与速率限制

为什么需要主动做速率控制

LLM API 有 Token Per Minute(TPM)和 Request Per Minute(RPM)限制。当 Agent 系统在高并发下运行时,如果不加控制,很容易触发限流(HTTP 429),然后大量请求失败或重试,这既浪费成本,又可能造成雪崩效应。

主动管理速率,比被动等待 API 返回 429 再处理,要优雅得多。

令牌桶限流

令牌桶算法(Token Bucket)是控制请求速率的经典方案:桶里有一定数量的令牌,每次请求消耗令牌,令牌以固定速率补充,桶满为止。

python
import asyncio
import time

class TokenBucket:
    """基于 token 消耗的速率限制器"""
    
    def __init__(self, rate: int, capacity: int):
        """
        rate: 每秒补充的 token 数(对应 API 的 TPM/60)
        capacity: 桶容量(最大突发量)
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1) -> None:
        """等待直到有足够的令牌"""
        while True:
            async with self._lock:
                self._refill()
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return
                # 计算需要等待的时间
                wait_time = (tokens - self.tokens) / self.rate
            await asyncio.sleep(wait_time)
    
    def _refill(self):
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.rate
        )
        self.last_refill = now

# 使用:对于 TPM 限制为 100,000 的 API
# rate = 100000 / 60 ≈ 1666 tokens/second
limiter = TokenBucket(rate=1666, capacity=10000)

async def call_llm_with_rate_limit(prompt: str, estimated_tokens: int):
    await limiter.acquire(estimated_tokens)
    return await call_llm(prompt)

这个例子只演示 TPM 维度。生产环境通常还需要同时限制 RPM、并发请求数和单租户预算,并在请求完成后用真实 usage 回填修正估算值。cached tokens 虽然更便宜,但仍可能计入供应商的速率限制;输出 token 也经常是限流和成本失控的主要来源。

优雅降级

在限流或 API 不可用时,Agent 系统需要有明确的降级策略,而不是直接报错或无限重试:

降级时有几个可选策略,按优先级排序:

  1. 切换到备用模型:同等能力的不同供应商
  2. 返回缓存的旧结果(如果业务允许轻微过期数据)
  3. 降级到规则系统(如果任务有非 LLM 的替代方案)
  4. 进入等待队列,稍后重试
  5. 返回人工处理,通知操作人员

19.6 延迟优化策略 —— 让用户感觉快

首 token 延迟 vs 总延迟

对 Agent 系统来说,延迟有两个不同的含义:

  • 首 token 延迟(TTFT,Time To First Token):从发出请求到收到第一个 token 的时间。对于有流式展示的场景,这决定了用户感知到"它开始回复了"的时间。
  • 总延迟(Total Latency):从发出请求到收到完整响应的时间。对于需要完整结果才能继续的步骤,这才是真正的瓶颈。

在多步骤 Agent 中,这两者的优化策略不同:

  • 减少 TTFT:Prompt Caching(减少预填充时间)、选更快的模型
  • 减少总延迟:并行化(工具并发、子任务并发)、减少输出 token 数

思维链 vs 直接输出的延迟权衡

推理模型(如 o 系列)在回答前会做内部推理,这会增加延迟。对于需要深度思考的复杂规划任务,这个延迟是值得的;但对于简单的路由决策或参数提取,用推理模型反而是负优化。

判断依据:如果一个任务你认为"有经验的工程师看一眼就知道答案",就不需要推理模型。如果是"需要思考几步才能确定策略",推理模型才有价值。

预取策略

在用户操作的空档期,预先准备可能需要的上下文,减少实际需要时的等待:

python
import asyncio

async def agent_with_prefetch(user_input: str):
    """在处理用户输入的同时,预取可能需要的上下文"""
    
    # 同时启动:意图分析 + 预取可能需要的数据
    intent_task = asyncio.create_task(analyze_intent(user_input))
    
    # 根据历史模式,预取大概率会用到的数据
    prefetch_tasks = [
        asyncio.create_task(prefetch_user_profile()),
        asyncio.create_task(prefetch_recent_context()),
    ]
    
    # 先等意图分析完成
    intent = await intent_task
    
    # 根据意图决定是否需要额外的工具调用
    if intent == "calendar_query":
        calendar_task = asyncio.create_task(fetch_calendar_data())
    
    # 合并预取结果(此时大概率已完成)
    profile, context = await asyncio.gather(*prefetch_tasks)
    
    # 最终生成响应
    return await generate_response(intent, profile, context)

这个模式在 UX 上的效果很明显:用户感知到的等待时间是"意图分析时间"而不是"意图分析 + 数据获取时间",两者并行执行了。

19.7 Agent 配置管理与迭代发布

为什么 Agent 的配置管理比传统软件更复杂

传统软件发布,代码变了才算发布。Agent 有一个特殊性:改一个 prompt,等于改了一段"代码"。System prompt 调整、工具描述变化、模型版本切换、路由规则修改——这些都会影响 Agent 的行为,但都没有代码 diff。

如果不把这些变化当成软件发布来管理,你会遇到以下困境:

  • 在 prompt 里直接改了两行,上线后行为变了,但无法快速回滚
  • 多个工程师同时修改 prompt,互相覆盖
  • 不知道哪次"优化"导致了性能退化

解决方案是把 Agent 配置版本化:用版本控制管理配置,用发布流程管理变更。

Agent 配置的组成

一个完整的 Agent 配置应该包含:

yaml
# agent_config.yaml
version: "2.3.1"
created_at: "2026-04-15"

model:
  primary: "gpt-5.4"
  fallback: "gpt-5.4-mini"
  reasoning_tasks: "o4-mini"

system_prompt: |
  你是一个...(完整的 system prompt)

tools:
  - name: "search_knowledge_base"
    enabled: true
    max_results: 5
  - name: "execute_code"
    enabled: true
    timeout_seconds: 30

routing:
  complexity_threshold: 3
  use_cascade: true

guardrails:
  max_turns: 20
  max_tokens_per_turn: 4000
  sensitive_topics: ["financial_advice", "medical_diagnosis"]

把这个 YAML 文件放进 Git,每次修改都有 PR、都有 review、都有 commit 记录。这不是形式主义,是在说"谁在什么时候做了什么改动,以及为什么"。

A/B 测试

当你有两个 prompt 版本、两个模型选项,不确定哪个更好时,A/B 测试是最直接的验证方式:

python
import hashlib
import random

class AgentConfigRouter:
    """在不同配置版本之间分流请求"""
    
    def __init__(self, experiments: list[dict]):
        """
        experiments: [
            {"config": config_v1, "weight": 0.5, "name": "control"},
            {"config": config_v2, "weight": 0.5, "name": "treatment"},
        ]
        """
        self.experiments = experiments
        self._validate_weights()
    
    def _validate_weights(self):
        total = sum(e["weight"] for e in self.experiments)
        assert abs(total - 1.0) < 1e-6, f"权重之和必须为 1,实际为 {total}"
    
    def get_config(self, user_id: str = None) -> tuple[dict, str]:
        """
        返回 (config, experiment_name)
        如果提供 user_id,同一用户每次分到同一组(粘性分流)
        """
        if user_id:
            # 基于用户 ID 的稳定哈希分流,避免 Python hash 随进程重启变化
            digest = hashlib.sha256(user_id.encode()).hexdigest()
            seed = int(digest[:8], 16) / 0xFFFFFFFF
        else:
            seed = random.random()
        
        cumulative = 0
        for exp in self.experiments:
            cumulative += exp["weight"]
            if seed < cumulative:
                return exp["config"], exp["name"]
        
        return self.experiments[-1]["config"], self.experiments[-1]["name"]

# 记录每次请求使用的实验组,用于后续统计分析
router = AgentConfigRouter([
    {"config": config_v1, "weight": 0.5, "name": "control"},
    {"config": config_v2, "weight": 0.5, "name": "treatment"},
])

config, experiment = router.get_config(user_id=user_id)
# 在日志/追踪系统中记录 experiment,关联任务成功率和成本

A/B 测试的结论需要足够的样本量才有统计意义。对于低频任务,可能需要跑几周才能得到置信度足够的结论。

灰度发布与回滚

不要把新版本直接推给全量用户。灰度发布(Canary Release)让你能在出问题时快速控制影响范围:

发布流程:
1% 流量 → 观察 24h → 无异常
5% 流量 → 观察 24h → 无异常
20% 流量 → 观察 48h → 无异常
100% 流量

定义明确的回滚触发条件,而不是靠人工判断:

python
class DeploymentMonitor:
    """监控新版本指标,自动触发回滚"""
    
    ROLLBACK_THRESHOLDS = {
        "error_rate": 0.05,          # 错误率超过 5% 触发回滚
        "task_success_rate_drop": 0.1, # 任务成功率下降超过 10%
        "p95_latency_increase": 2.0,   # P95 延迟增加超过 2 倍
        "cost_per_task_increase": 1.5, # 每任务成本增加超过 50%
    }
    
    def should_rollback(self, current_metrics: dict, baseline_metrics: dict) -> bool:
        error_rate = current_metrics.get("error_rate", 0)
        if error_rate > self.ROLLBACK_THRESHOLDS["error_rate"]:
            return True
        
        success_drop = (
            baseline_metrics["task_success_rate"] - current_metrics["task_success_rate"]
        )
        if success_drop > self.ROLLBACK_THRESHOLDS["task_success_rate_drop"]:
            return True
        
        return False

Feature Flag:把配置变更当成可控的软件发布

Feature Flag 让你在不修改代码的情况下,动态控制 Agent 的行为:

python
from typing import Any

class AgentFeatureFlags:
    """Agent 行为的动态开关"""
    
    def __init__(self, flag_service):
        self.flags = flag_service
    
    def get_model(self, context: dict) -> str:
        """根据 Feature Flag 决定使用哪个模型"""
        if self.flags.is_enabled("use_gpt5_5", context):
            return "gpt-5.5"
        return "gpt-5.4"
    
    def get_max_reasoning_depth(self) -> int:
        """ReAct 最大推理步骤数"""
        return self.flags.get_int("max_reasoning_steps", default=10)
    
    def is_tool_enabled(self, tool_name: str, user_id: str) -> bool:
        """某个工具是否对特定用户开启"""
        return self.flags.is_enabled(f"tool_{tool_name}", {"user_id": user_id})

Feature Flag 和 A/B 测试的区别:A/B 测试目的是收集数据做决策,Flag 是已经决策后的渐进式开关。实践中两者经常配合使用:先 A/B 测试确认新行为更好,再通过 Flag 控制逐步放量。

完整的发布工作流

这套流程的关键是:把 prompt 变更和代码变更用同等严肃的流程对待。Eval Gate 是最重要的护栏——在上一章(第18章)里讲到的回归评估套件,就是在这里发挥作用的。一次 prompt 修改让某类任务成功率下降 5%,在 CI 里就应该被发现,而不是等到生产环境出问题。

面试高频题

Q:如何将 Agent 的 API 调用成本降低 50% 同时保持质量?

参考回答框架:

成本降低 50% 不是一个单一手段能做到的,而是多个优化叠加的结果。优先级从高到低:

第一优先:Prompt Caching(收益最大)

分析你的 Agent 每次请求中有多少 token 是重复发送的——system prompt、工具定义、背景文档。这部分通常占总输入的 60-80%。配置好 Prompt Caching 后,这些 token 的读取成本降至原来的 10%。如果这些内容占到输入 token 的 70%,仅此一项就能降低总输入成本约 63%。

第二优先:模型路由

评估每一个 Agent 步骤真正需要什么能力。意图分类、参数提取、简单路由这类任务,用小模型(价格通常是顶级模型的 1/10 甚至更低)完全够用。只在需要复杂推理或高质量生成的步骤上用大模型。

第三优先:并行工具调用

确认你的框架是否在并行执行无依赖的工具调用。这不降成本,但降低延迟,从而可以设置更短的超时,减少因超时重试带来的额外成本。

第四优先:批处理 API

对于非实时任务(后台评估、数据处理、报告生成),使用 Batch API 获得 50% 的折扣。

加分点(展现深度):

  • 要验证优化效果,需要先建立成本监控——按任务类型追踪 input/output token 分布,找到成本最高的任务类型优先优化
  • 成本优化和质量之间存在真实的权衡,不能只看成本数字。需要配合 Eval 套件,确认模型降档或 prompt 精简后,核心任务成功率没有下降
  • 告警很重要:当某个优化手段(比如缓存命中率下降)失效时,成本会反弹,需要及时发现

参考资料

[1] OpenAI API Pricing - OpenAI (https://openai.com/api/pricing/)

[2] Prompt Caching - Anthropic API Documentation (https://platform.claude.com/docs/en/docs/build-with-claude/prompt-caching)

[3] Prompt Caching - OpenAI Developer Documentation (https://developers.openai.com/api/docs/guides/prompt-caching)

[4] Batch API - OpenAI Developer Documentation (https://developers.openai.com/api/docs/guides/batch)