第16章:Agent Runtime —— 让 Agent 活在生产环境
很多团队第一次做 Agent 时会遇到同一个落差:demo 里 50 行代码就能跑起来,到了生产环境却卡在崩溃恢复、任务超时、工具副作用、沙箱隔离和并发控制上。这个落差不完全是模型问题,更是 Runtime 问题。
前 15 章讲的都是怎么构建 Agent——认知架构、工具调用、记忆、规划、多 Agent 协作、协议、护栏。这些是"造车"的问题。但一辆车能跑 0-100km/h 加速和在高速公路上连续跑 10 万公里不出事,是两件完全不同的事。
这一章讲的是 Agent 的"高速公路系统":持久化执行、沙箱隔离、状态管理、异步任务、流式交互、容错降级、幂等重试。理解了这些,你才知道为什么 demo 和 production 之间的 gap 不是你 prompt 写得不够好。
框架帮你构建 Agent,Runtime 帮你可靠地运行 Agent。
16.1 什么是 Agent Runtime —— 框架和 Runtime 的本质区别
核心直觉
框架是造车工厂,Runtime 是道路系统。框架决定 Agent 的行为逻辑(用什么模型、有什么工具、怎么思考),Runtime 决定 Agent 能不能在断网、崩溃、超时、并发激增的情况下还可靠运行。
框架做了什么
框架的核心职责是定义 Agent 的行为:
- 提供 Agent 循环(接收输入 → LLM 推理 → 解析动作 → 执行工具 → 观察结果 → 决定是否继续)
- 管理工具注册和调用
- 组织多 Agent 协作(handoff、supervisor 模式)
- 提供 Guardrails(输入/输出/工具护栏)
- 集成记忆和上下文管理
LangGraph、CrewAI、OpenAI Agents SDK 都在这一层。它们回答的问题是"这个 Agent 是什么、能做什么"。
Runtime 做了什么
Runtime 的核心职责是保证 Agent 可靠执行:
- 每一步后保存检查点,崩溃后从断点恢复
- 在隔离的沙箱中执行代码和操作文件
- 管理长任务的排队、调度、超时和重试
- 处理 LLM API 不可用时的降级和备用方案
- 持久化会话状态,让 Agent 跨进程重启后保持连续性
- 提供流式事件总线,让前端实时感知 Agent 在做什么
为什么需要区分
一个最常见的错误认知:用了 LangGraph 就等于有了生产级 Runtime。LangGraph 提供了 checkpointer——但它是一个持久化接口,不是完整的 Runtime。真正的 Runtime 需要把检查点存储、沙箱生命周期、任务队列、熔断器、流式事件管线全部集成到一起。
工程上更准确的说法是:写出一个会调用工具的 Agent 循环很容易,让它连续运行几小时、处理中途重启、工具超时、LLM 限流和外部服务抖动,才是真正的产品化工作。
区分框架和 Runtime 不是学术分类,而是工程边界。框架的代码跑在你的进程里,Runtime 的职责跨进程、跨机器、跨重启。当你遇到"Agent 跑到一半崩了怎么恢复"、"怎么让 Agent 跑 2 小时的任务不丢状态"这类问题时,你需要的不是更好的框架,而是 Runtime。
16.2 Durable Execution —— 让 Agent 拥有"存档读档"能力
核心直觉
你玩 RPG 游戏时会在打 Boss 前存档。死了之后从存档点重来,不用从新手村重新跑。Durable Execution 对 Agent 做的就是这件事——每一步后自动存档,崩了从最后一步接着跑,而不是从头再来。
问题有多严重
一个运行 30 分钟的 Agent 任务,可能在任意时刻崩溃:
- LLM API 返回 500(云服务故障,每几个月总有一次)
- 工具调用超时(第三方 API 慢响应)
- 进程被 OOM Killer 杀掉(Agent 上下文膨胀到几 GB)
- 部署重启(你发了新版本,旧进程被杀)
- 网络断开(Agent 跑在笔记本电脑上,合盖休眠)
没有持久化执行,上面任何一种情况都意味着:全部中间结果丢失,从头重跑。对于消耗了大量 token、调了十几次工具的任务,这个代价不只是时间——是钱。
检查点机制
Durable Execution 的核心原理简单到只有一句话:每完成一个步骤,把当前状态写入持久化存储。
步骤 1 完成 → 写检查点
步骤 2 完成 → 写检查点
步骤 3 完成 → 写检查点
... 崩了!
重新启动 → 读最后一个检查点 → 从步骤 4 继续这不是新概念。Temporal(前身是 Uber 的 Cadence)在 2019 年就把这个模式做成了微服务编排的基础设施。当时解决的问题是"如果订单处理中途崩了,怎么不重复扣款也不丢订单"。Agent 面临的是完全同构的问题——只是"单步操作"从调用微服务变成了调用 LLM + 执行工具。
一个最小实现
持久化检查点不需要重框架。一个 Python 装饰器就能说清楚核心逻辑:
import json
import hashlib
import redis
from functools import wraps
from typing import Any, Callable
r = redis.Redis(decode_responses=True)
def checkpoint(task_id: str, step_name: str):
"""一个简单的检查点装饰器——演示原理,生产环境需要更多工程化"""
def decorator(fn: Callable):
@wraps(fn)
def wrapper(*args, **kwargs):
# 尝试读取已有检查点
ckpt_key = f"agent:{task_id}:{step_name}"
cached = r.get(ckpt_key)
if cached:
return json.loads(cached)
# 执行步骤
result = fn(*args, **kwargs)
# 写入检查点
r.set(ckpt_key, json.dumps(result), ex=86400)
return result
return wrapper
return decorator
@checkpoint(task_id="task-42", step_name="fetch_issue")
def fetch_jira_issue(issue_id: str) -> dict:
resp = requests.get(
f"https://jira.company.com/api/issue/{issue_id}",
timeout=30
)
resp.raise_for_status()
return resp.json()
@checkpoint(task_id="task-42", step_name="summarize")
def summarize_issue(issue: dict) -> str:
return llm_call(
system="你是一个技术摘要助手",
prompt=f"用 3 句话总结这个 Jira 工单:\n{json.dumps(issue)}"
)task_id 是幂等的关键——同一个 task_id + 同一个 step_name,无论执行多少次,结果来自同一个检查点。如果 LLM 调用成本 $0.01 而你已经为这一步付过费,从检查点读取就是纯利润。
当然,这个实现省略了很多生产必需的东西:检查点版本管理(步骤逻辑改了,旧检查点要失效)、部分失败标记(步骤失败和步骤没跑是两回事)、事务性写入(写检查点和实际执行应该是原子的)。但它们都是可解决的工程扩展,不影响核心概念。
持久化 vs 非持久化:一个对比
| 非持久化 Agent | 持久化 Agent | |
|---|---|---|
| 崩溃后恢复 | 从头重跑(损失所有 token 成本 + 时间) | 从最后检查点继续 |
| 长任务(> 30min) | 几乎不可行 | 原生支持 |
| 任务暂停/恢复 | 不支持 | 支持(检查点 = 可序列化的暂停点) |
| 调试 | 重跑整个任务 | 从任意检查点回放 |
| 额外开销 | 零 | 每步一次写操作(通常 < 10ms) |
检查点应该存什么
这个问题看似简单,实则决定了你的恢复质量:
- 最小方案:只存消息历史(对话记录)。能恢复 LLM 上下文,但工具调用的副作用(已发出的邮件、已创建的订单)无法回滚。
- 标准方案:消息历史 + 工具调用结果 + 流程状态(当前处于哪个步骤)。大部分 Agent 任务用这个就够了。
- 完整方案:以上 + 外部状态快照(文件系统状态、数据库行版本、外部 API 响应)。用于需要精确回放的场景(比如调试和审计)。
选择哪种方案取决于你的容错需求。如果工具操作是幂等的(16.8 节会展开),最小方案通常够用。如果工具的副作用不可逆(发邮件、扣款),至少要标准方案,并且你的恢复逻辑需要检查"这一步到底执行成功了没有"。
Agent 场景里的新问题
Durable Execution 不是 Agent 时代才出现的概念,但 2025-2026 年它重新变成热点,是因为长任务 Agent 把老问题放大了。Microsoft Durable Task for AI agents 文档明确把状态管理、检查点和分布式协调作为底层基础设施,让 Agent 代码不用自己处理这些细节 [1]。Inngest 也把 AI Agent 作为 durable execution 进入主流采用的重要驱动力之一 [2]。
也有人指出反向问题:Durable Execution 的设计假设每步是确定的——重放应该得到相同结果。但 LLM 调用天然非确定(除非 temperature=0 且所有参数固定)。在实践中,检查点重放 LLM 调用时你需要固定 seed 参数并确保模型版本一致,否则"恢复"后的推理可能与之前不同。
这里要避免一个过度推广:不是每个 Agent 都需要持久化,简单的一次性问答不值得引入这个复杂度。但对于任何运行超过几分钟、会调用多个工具、或有不可忽略 token/外部 API 成本的任务,Durable Execution 应该成为默认架构选项之一。
16.3 Sandbox —— 把 Agent 关在笼子里,而不是给它你家的钥匙
核心直觉
Agent 要执行代码、写文件、调 API、装依赖——这些操作你不敢让它在你的生产服务器上直接做。沙箱就是给 Agent 一个隔离的笼子:它可以随便折腾,但折腾的范围碰不到外面的东西。
为什么 Agent 需要沙箱
第 15 章讲了多起 Agent 删除生产数据库、外泄数据的事故。那些事故的根本原因不是护栏没做好,而是Agent 有接触生产环境的权限。护栏是软件层的防御,沙箱是基础设施层的隔离。
Agent 需要沙箱的核心理由不是"它会恶意行为"——大多数时候它只是犯错。但一个犯了错的 Agent 如果跑在你的生产环境里,它的错误就是你的灾难:
- 一个 Coding Agent 执行了
rm -rf /(不一定是恶意的,可能是生成的清理脚本有 bug) - 一个数据分析 Agent 装了一个恶意 pip 包(供应链攻击)
- 一个 Research Agent 抓取了一个恶意网页,网页里有浏览器 0-day(别笑,这是真实的安全顾虑)
- 一个 Browser Agent 在填表单时误触了"删除所有数据"按钮
隔离度的光谱
不是所有任务都需要同等级别的隔离。从轻到重:
| 隔离级别 | 实现 | 冷启动 | 安全边界 | 适合场景 |
|---|---|---|---|---|
| 子进程 | subprocess.run() | < 10ms | 几乎为零 | 单次简单计算、内部脚本 |
| Docker | 容器运行时 | 1-5s | 中(共享内核) | 大多数代码执行场景 |
| gVisor | 用户态内核 | 1-5s | 高(系统调用拦截) | 多租户场景、不可信代码 |
| microVM | Firecracker | 50-200ms | 极高(硬件虚拟化) | 高风险操作、外部代码执行 |
不同路线的工程取舍
Docker 容器是最常见的方案。你启动一个临时容器,把 Agent 的代码和依赖装进去,跑完就销毁。问题是启动慢(镜像拉取 + 容器初始化需要几秒),而且共享宿主机内核——理论上存在容器逃逸风险。在实践中,这个风险对大多数企业内部场景是可以接受的。
microVM / 轻量 VM 是高隔离沙箱的一条常见路线。E2B 文档把 Sandbox 描述为按需创建的快速 Linux VM,适合让 Agent 安全执行代码和工具 [5]。Firecracker 是这类 microVM 架构中最有代表性的实现之一:每个执行环境有自己的内核,安全边界比共享宿主内核的容器更清晰。代价是资源开销和运维复杂度都更高。
gVisor 是 Google 的方案。它在用户空间实现了一个内核,拦截容器的系统调用。安全隔离比 Docker 强(系统调用被过滤),比 microVM 轻(启动更快)。如果你需要比 Docker 更安全的方案但不想引入虚拟化开销,这是一个务实的中间选择。
远程执行环境:Modal、E2B、Replit、Codesandbox 等平台提供了托管沙箱。你把代码发过去,它们在云端隔离执行,返回结果。好处是零运维负担,坏处是有外部依赖和成本。Modal 的 Sandboxes 文档明确把执行 LLM 生成代码、运行不可信代码、检出代码仓库并执行测试列为典型场景 [6]。
哪个方案适合你? 如果你是创业团队做内部工具,Docker 容器 + 合理的网络策略足够。如果你是做面向客户的 Coding Agent 产品,Firecracker microVM 是更安心的选择。如果你是金融/医疗行业,合规要求可能强制 microVM 级别的隔离。
沙箱不只是执行环境
一个完整的沙箱方案需要回答这几个问题:
- 文件系统:Agent 能读写哪些目录?外部存储怎么挂载?
- 网络权限:能访问哪些域名和 IP?能不能出外网?
- 资源上限:CPU、内存、磁盘的时间/容量限制是多少?超过上限是杀进程还是限流?
- 生命周期:沙箱创建时机(任务开始时?首次代码执行时?)、闲置多长时间后回收、任务完成后数据保留多久?
- 凭据注入:Agent 需要的 API Key 怎么安全地注入沙箱而不暴露在日志里?
一个常见的设计模式:Ephemeral + External State
沙箱本身是临时的——任务结束就销毁。但 Agent 需要持久化的东西(代码仓库、生成的文档、数据库查询结果)存在沙箱外面。这个模式在 16.4 节展开。
# 伪代码:沙箱执行的典型流程
sandbox = create_sandbox(
image="python:3.12",
env={"API_KEY": runtime.get_secret("agent_api_key")}, # 运行时注入,不进日志
network="isolated", # 只能访问白名单域名
max_runtime_seconds=300,
max_memory_mb=512,
)
try:
result = sandbox.execute(
code=agent_generated_code,
stdin="", # 不传用户输入到沙箱
)
if result.exit_code != 0:
raise SandboxExecutionError(result.stderr)
return result.stdout
finally:
sandbox.destroy() # 确保沙箱被销毁,不管成功还是失败16.4 状态管理 —— 临时的壳,持久的心
核心直觉
Agent 的执行环境(进程、沙箱、容器)是临时的——随时可能被销毁重建。但 Agent 的记忆和任务进度必须是永久的。状态管理要解决的就是一个矛盾:运行在临时环境里的 Agent,怎么在跨重启、跨环境重建后保持连续性。
三种状态
Agent 的状态分三层,每层的持久化策略不同:
| 状态类型 | 内容 | 生命周期 | 持久化策略 |
|---|---|---|---|
| Session State | 对话历史、工具调用日志、当前上下文 | 单次会话 | 需要持久化(崩溃恢复) |
| Execution State | 任务进度、中间结果、分支决策、检查点 | 单次任务执行 | 必须持久化(Durable Execution 的基础) |
| External State | 代码仓库、生成的文档、数据库写入 | 跨会话、跨任务 | 本身就是持久化的 |
Session State 就是你的消息数组(messages list),以及 LLM 的内部状态(如果模型支持)。它是 Agent "记得刚才发生了什么"的基础。
Execution State 是任务层面的——"我已经完成了步骤 3,下一步是步骤 4,步骤 2 的结果存在这里"。它比 Session State 更结构化,通常以一个字典或 JSON 对象存储。
External State 是 Agent 对世界产生的副作用——写了个文件、创建了个 PR、数据库里插了行数据。这些东西的持久化是业务系统本身的职责。
Ephemeral + External State 模式
分离临时的(ephemeral)和持久的(external),是 Agent 状态管理最核心的设计模式:
┌─────────────────────────────────────┐
│ Episode Runtime │
│ ┌──────────┐ ┌──────────┐ │
│ │ Sandbox │ │ Agent │ │
│ │ (临时) │ │ 进程 (临时)│ │
│ └────┬─────┘ └────┬─────┘ │
│ │ │ │
│ │ 读写 │ 读写 │
│ └──────┬───────┘ │
│ │ │
└──────────────┼───────────────────────┘
│
┌───────▼────────┐
│ External State │ ← 持久化存储
│ (文件/DB/Blob) │
└────────────────┘当 Agent 需要重启时,流程是:
- 创建新的沙箱(空的、全新的)
- 从 External State 中恢复 Agent 上次写入的数据
- 从检查点存储中恢复 Session State 和 Execution State
- Agent 继续从上次中断的地方执行
这个模式的价值在于:运行环境是牛马,用完就被杀掉。真正的状态永远在外面。 你不需要备份沙箱的磁盘镜像,不需要热迁移容器——直接把外部状态挂载到新环境,Agent 无缝继续。
一个具体的恢复流程
import json
from typing import Optional
class AgentRuntime:
def __init__(self, state_store, checkpoint_store, sandbox_provider):
self.state_store = state_store
self.checkpoint_store = checkpoint_store
self.sandbox_provider = sandbox_provider
async def resume_or_start(self, task_id: str) -> dict:
"""恢复已有任务或启动新任务"""
# 1. 尝试读取执行状态
exec_state = await self.checkpoint_store.get(f"{task_id}:exec_state")
if exec_state:
# 恢复模式
session_state = await self.checkpoint_store.get(f"{task_id}:session")
messages = json.loads(session_state)
# 重新挂载外部状态
external_refs = json.loads(exec_state).get("external_refs", {})
# 从上次中断的步骤继续
last_step = json.loads(exec_state)["current_step"]
else:
# 全新任务
messages = [{"role": "user", "content": task_input}]
external_refs = {}
last_step = 0
# 2. 创建全新沙箱
sandbox = await self.sandbox_provider.create()
sandbox.mount_external_state(external_refs)
# 3. 恢复执行
return await self._execute_from(
task_id=task_id,
sandbox=sandbox,
messages=messages,
start_step=last_step,
)
async def _execute_from(self, task_id, sandbox, messages, start_step):
"""从指定步骤开始执行 Agent 循环"""
for step_num, step in enumerate(self.plan[start_step:], start=start_step):
# 执行步骤
result = await step.execute(sandbox, messages)
# 写入检查点
await self.checkpoint_store.set(
f"{task_id}:exec_state",
json.dumps({"current_step": step_num + 1, "plan": self.plan})
)
await self.checkpoint_store.set(
f"{task_id}:session",
json.dumps(messages)
)
return {"status": "completed", "messages": messages}这个流程图展示了恢复的全过程:
社区踩坑实录
坑 1:把 Session State 和 Execution State 混在一起存
「我们在消息历史里同时存了对话内容和任务步骤标记,结果一压缩上下文,步骤标记全乱了」——Reddit r/ai_agents 上的开发者反馈。
教训:Session State(对话)和 Execution State(任务进度)分开存储。上下文压缩只操作 Session State,不影响 Execution State 的判断逻辑。
坑 2:沙箱销毁前忘了持久化
「我们设了 30 分钟 idel timeout,Agent 跑到第 29 分钟时沙箱被回收了,中间结果全丢了」——开发者博客上的经验分享。
教训:不要依赖沙箱的持久性。每完成一个有副作用的步骤,立即写 External State。不要等到任务结束才一起存。
坑 3:检查点恢复后 LLM 行为不一致
「从检查点恢复后,同样的上下文,模型给出了和崩溃前不一样的下一步决策,导致后续工具调用进入了一个无效循环」——HN 讨论。
这是 Durable Execution 应用于 LLM Agent 的根本矛盾。传统 Durable Execution 假设重放是确定的——同样的输入 → 同样的代码路径。LLM 调用不满足这个假设。实际缓解方法:固定 seed + temperature=0 + 重放时检测偏差(如果恢复后的工具调用与之前记录的预期不同,立即标记人工审核)。
16.5 后台任务与异步生命周期
核心直觉
Agent 任务不是 HTTP 请求——它可能跑几分钟到几小时。你不能让用户一直盯着浏览器等。后台任务系统把 Agent 变成异步的:用户提交任务 → 系统排队执行 → 完成后通知用户。
任务生命周期
一个 Agent 后台任务的完整生命周期:
几个关键的工程点:
排队:任务提交后进入队列。队列解耦了"接收请求"和"执行任务"——即使所有 Worker 都在忙,新任务也不会丢。Celery、BullMQ、Google Cloud Tasks 都是成熟的队列方案。
调度:队列里的任务按优先级分配 Worker。高优先级队列(用户实时交互的任务)和低优先级队列(批量处理任务)分开。
回调:任务完成后,调用方如何知道?三种常见模式:
| 模式 | 做法 | 适合场景 | 缺点 |
|---|---|---|---|
| 轮询 | 客户端每隔 N 秒查一次状态 | 简单场景、内部工具 | 浪费请求、延迟不精确 |
| Webhook | 任务完成后 POST 到回调 URL | 服务端集成 | 需要处理回调失败(重试/死信) |
| 消息推送 | 通过 WebSocket/SSE 实时推送 | 前端实时 UI | 需要客户端在线 |
一个后台任务的实现骨架
from celery import Celery
from typing import Optional
import json
app = Celery("agent_worker", broker="redis://localhost:6379/0")
class AgentTask:
def __init__(self, task_id: str, input_data: dict, webhook_url: Optional[str] = None):
self.task_id = task_id
self.input = input_data
self.webhook_url = webhook_url
self.checkpoint_store = CheckpointStore()
def run(self):
"""任务主循环——可以被 Celery worker 调用"""
# 尝试恢复检查点
state = self.checkpoint_store.get(f"{self.task_id}:state")
step_num = state["current_step"] if state else 0
agent = AgentLoop(checkpoint_store=self.checkpoint_store)
try:
result = agent.run_from_step(
task_id=self.task_id,
start_step=step_num,
input=self.input,
max_steps=100,
timeout_seconds=3600,
)
self._on_complete(result)
except RetryableError as e:
self._on_retryable_error(e)
except FatalError as e:
self._on_fatal_error(e)
def _on_complete(self, result):
if self.webhook_url:
requests.post(self.webhook_url, json={
"task_id": self.task_id,
"status": "completed",
"result": result,
})
def _on_retryable_error(self, error):
# 指数退避重试:第 1 次等 10s,第 2 次等 40s,第 3 次等 90s
retry_count = error.context.get("retry_count", 0)
delay = min(10 * (2 ** retry_count), 300) # 最多等 5 分钟
raise self.retry(countdown=delay)
@app.task(bind=True, max_retries=3, acks_late=True)
def run_agent_task(self, task_id: str, input_data: dict, webhook_url: str = None):
"""Celery 任务包装器"""
task = AgentTask(task_id, input_data, webhook_url)
task.run()关键设计点:
acks_late=True:Celery 在任务完成后再确认消费。如果 Worker 在执行中途崩溃,消息回到队列,另一个 Worker 重新执行——配合检查点,不会重复已完成的工作。max_retries=3:最多重试 3 次,防止无限重试消耗资源。webhook_url:调用方注册的回调地址,任务完成时通知。这是解耦的关键——调用方不需要轮询。
多 Worker 与并发
一个生产级的 Agent Runtime 需要支持多个 Worker 并行处理任务。关键考量:
- Worker 数量:受 LLM API 速率限制约束。如果你每分钟 API 额度是 500 次,单个 Agent 任务平均每分钟调 10 次 LLM,那最多跑 50 个并发任务(实际上要留 buffer,建议不超过额度的 70%)。
- 粘性路由:同一个 task 的重试(检查点恢复后)不需要回到同一个 Worker——这也是为什么检查点必须存在外部存储(Redis、Postgres)而不是 Worker 本地内存。
- 优雅关闭:收到 SIGTERM 时,Worker 应该等待当前任务完成检查点写入再退出,避免丢失进度。
16.6 Agent 流式交互架构
核心直觉
一个运行中的 Agent 会产生三种不同性质的流:模型逐 token 吐字、工具调用开始/完成/失败事件、后台任务的进度更新。这三种流服务于不同的消费者,不应该被混在一起。
三种流
| 流类型 | 内容 | 消费者 | 传输方式 | 关键要求 |
|---|---|---|---|---|
| Token Stream | LLM 推理的逐 token 输出 | 前端渲染文字 | SSE | 低延迟(< 100ms 首 token) |
| Tool Event Stream | 工具调用开始/进度/结果 | 前端展示"Agent 在做什么" | SSE | 结构化、可折叠、敏感信息过滤 |
| Task Progress Stream | 任务状态变更、检查点写入、剩余步骤估计 | 前端进度条 + 状态栏 | SSE / WebSocket | 低频、可靠、断线重连 |
把这三者分开的原因很简单:不要让 token stream 的速度被 tool event 阻塞。LLM 正在流畅吐 token 的时候,一个工具调用完成不应该干扰文字渲染。反过来,工具调用的详细信息(含参数、返回值)不应该被塞进 token stream 里让前端解析——它应该是独立的结构化事件。
SSE vs WebSocket vs Webhook 的选择
这个问题在社区被反复讨论,但其实决策没那么难:
SSE(Server-Sent Events):服务端单向推送流。HTTP 原生支持,天然支持断线重连(Last-Event-ID),负载均衡器友好。用于 Token Stream 和 Tool Event Stream——信息流是单向的(服务端 → 客户端),SSE 是比 WebSocket 更简单、更符合 HTTP 语义的选择。
WebSocket:双向通信。适合需要客户端主动发消息的场景——比如用户在 Agent 中途插入新指令、点击"暂停"或"纠正方向"。WebSocket 的缺点是连接管理比 SSE 复杂(没有原生重连、负载均衡器配置更麻烦)。
Webhook:任务完成时的回调(HTTP POST)。不是"流"而是"事件回调",适合服务端集成——另一个服务在等 Agent 任务完成。
一个务实的组合:Token Stream 和 Tool Events 用 SSE,用户交互用 WebSocket,任务完成通知用 Webhook。 不要把一切需求都往 WebSocket 里塞——它为双向设计,但大部分场景下你是单向推送。
前端展示策略
Agent 的输出和传统 Chatbot 有本质区别——用户看到的不是单纯的对话,而是一个在执行任务的自主系统。UX 设计几个关键点:
- 折叠工具日志:用户不需要看到
search_issues("project=PROJ max_stock=50")这种内部调用。默认折叠,展开时也过滤掉敏感信息(API Key、内部 IP)。 - 进度条但不承诺准确时间:Agent 自己也不知道还要跑几步。进度条应该是"已完成步骤数 / 预计最大步骤数"或"已消耗时间 / 超时时间"——给出锚点,但不虚报。
- 可中断:取消按钮应该在任何时候都可用。取消不等于立即停止——Agent 在写入当前检查点后再停止,而不是直接杀进程。
- 中间产物可查看:Agent 跑到一半产生的文件、数据、分析结果,应该可以让用户预览。这不仅建立信任,也让用户在 Agent 跑偏时能提前干预。
16.7 容错与降级策略
核心直觉
在生产环境,东西一定会坏。容错不是"怎么不出错",而是"出错了之后怎么让系统继续运行"。Agent Runtime 要处理三类故障:LLM API 故障、工具故障、自身进程故障。
LLM API 不可用
这是最常见的故障。LLM API 不可用的原因很多:供应商故障、速率限制、你的账户欠费、网络分区。故障模式不是"它坏了",而是"它有概率坏"——你需要在工程上假设它随时可能不可用。
降级策略分层:
| 策略 | 做法 | 成本 | 质量损失 |
|---|---|---|---|
| 备用模型 | 主模型不可用时切换到备用模型 | 无额外成本(按需切换) | 取决于备用模型能力 |
| 缓存响应 | 相同/相似 prompt 返回缓存结果 | 低(缓存存储成本) | 可能不适用(任务特异性) |
| 排队稍后 | 遇到 rate limit 时入队列延迟执行 | 低 | 延迟增加 |
| 降级到简单模式 | 跳过复杂推理,用规则引擎完成 | 低 | 质量下降,但任务能完成 |
| 人工接管 | 兜底,路由到人工处理 | 高(人力成本) | 质量最高但不可扩展 |
一个实用的熔断器实现:
import time
import threading
from dataclasses import dataclass, field
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: float = 30.0 # 30 秒后尝试半开
half_open_max: int = 1 # 半开状态允许的试探请求数
_failure_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0, init=False)
_state: str = field(default="closed", init=False)
_lock: threading.Lock = field(default_factory=threading.Lock, init=False)
def call(self, fn, *args, **kwargs):
with self._lock:
if self._state == "open":
if time.monotonic() - self._last_failure_time > self.recovery_timeout:
self._state = "half_open"
else:
raise CircuitBreakerOpenError("熔断器打开,拒绝调用")
try:
result = fn(*args, **kwargs)
with self._lock:
if self._state == "half_open":
self._state = "closed"
self._failure_count = 0
return result
except Exception:
with self._lock:
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._failure_count >= self.failure_threshold:
self._state = "open"
raise
# 使用:用熔断器包装 LLM API 调用
llm_cb = CircuitBreaker(failure_threshold=3, recovery_timeout=60)
try:
response = llm_cb.call(openai.chat.completions.create, model="gpt-4.1", messages=messages)
except CircuitBreakerOpenError:
# 熔断器打开——用备用模型
response = anthropic.messages.create(model="claude-sonnet-4-20250514", messages=messages)熔断器有三个状态:closed(正常,请求通过)、open(连续失败超过阈值,直接拒绝)、half-open(open 一段时间后允许少次试探,成功则关闭)。
工具级容错
工具调用有比 LLM API 更复杂的故障模式:
- 超时:外部 API 慢响应
- 部分失败:批量操作中部分成功部分失败
- 非确定性错误:400(你的请求有问题)vs 429(对方限流)vs 500(对方坏了)——不同的错误码应该有不同的重试策略
- 工具间影响:工具 A 的失败不应该让整个任务崩溃,应该让 Agent 知道 A 失败了并自己决定下一步
from enum import Enum
class ToolErrorType(Enum):
RETRYABLE = "retryable" # 429, 503——等一下再试
FATAL = "fatal" # 400, 401——不要重试,重试也没用
DEGRADED = "degraded" # 工具仍然能用但质量下降
def handle_tool_error(error: Exception, tool_name: str, attempt: int) -> ToolErrorType:
if isinstance(error, RateLimitError):
return ToolErrorType.RETRYABLE
if isinstance(error, AuthenticationError):
return ToolErrorType.FATAL
if isinstance(error, TimeoutError) and attempt < 3:
return ToolErrorType.RETRYABLE
if isinstance(error, TimeoutError):
return ToolErrorType.DEGRADED
return ToolErrorType.FATAL死信队列(DLQ)与任务重放
有些任务重试了 N 次仍然失败——它们就是"死了"。但"死了"不等于"丢了"。死信队列的作用是:把失败的任务保存到可诊断的存储中,而不是直接丢弃。
进入 DLQ 的任务带完整上下文:输入、所有检查点、最后一次错误、重试历史。运维人员可以从 DLQ 中取出任务,诊断根因,修复后重新提回任务队列。
running → failed → retry (3 次) → 仍失败 → DLQ
↓
运维诊断
↓
修复根因
↓
重新入队列 → running → completed部分失败处理
Agent 跑到第 7 步失败了,前 6 步的结果怎么处理?这个问题的答案不是技术性的,是业务性的:
- 如果前 6 步都是可逆的(读操作、分析)——保留这些结果,从第 7 步重试
- 如果前 6 步包含不可逆操作(发了邮件、扣了款)——你需要决定是回滚还是继续。回滚也不是技术问题——"回滚一封已发出的邮件"不存在这个操作,你只能发一封更正邮件
- 如果步骤之间有依赖——第 3 步的输出被第 5 步用到了,重试第 7 步可能需要重新计算上游步骤
社区最佳实践:不可逆操作(发邮件、写数据库、HTTP POST)要显式标记,Agent 计划里这些操作之前必须有审批检查点。 这不是 Runtime 能自动搞定的——它需要在 Agent 设计时就考虑进去。
16.8 幂等性与重试语义
核心直觉
重试机制最大的陷阱:你不知道上一次操作到底成功了没有。HTTP 超时不等于"没收到"——请求可能在网络上被成功处理了,只是响应没回来。幂等性的核心思想是:重复执行和只执行一次效果一样。
三种交付语义
| 语义 | 含义 | 实现方式 | 代价 |
|---|---|---|---|
| at-most-once | 最多执行一次,丢了就丢了 | 无重试 | 简单,但不可靠 |
| at-least-once | 至少执行一次,可能重复 | 重试直到确认成功 | 需要幂等处理 |
| exactly-once | 精确执行一次 | 幂等键 + 去重 + 事务 | 理论上不可实现(分布式系统定理),工程上可以近似 |
at-most-once 适合纯读操作——查数据库没查到,重查一次没有任何副作用。
at-least-once 是 Agent Runtime 的默认模式。检查点 + 重试 = 至少一次语义。你需要确保所有有副作用的工具操作是幂等的。
exactly-once 在实践中,通过幂等键可以实现近似 exactly-once。
幂等键模式
import uuid
def safe_send_email(recipient: str, subject: str, body: str) -> str:
"""幂等的邮件发送——同一个 idempotency_key 不会发两次"""
# 生成或使用已有的幂等键
idempotency_key = f"email:{hashlib.sha256(f'{recipient}:{subject}'.encode()).hexdigest()[:16]}"
# 检查去重表
existing = db.query("SELECT status FROM dedup_table WHERE key = ?", idempotency_key)
if existing and existing["status"] == "sent":
return {"status": "already_sent", "idempotency_key": idempotency_key}
# 实际发送
result = sendgrid.send(recipient=recipient, subject=subject, body=body)
# 记录到去重表
db.execute(
"INSERT INTO dedup_table (key, status, result, created_at) VALUES (?, 'sent', ?, NOW())",
idempotency_key, json.dumps(result)
)
return {"status": "sent", "idempotency_key": idempotency_key}幂等键不一定要随业务逻辑生成。更简单的做法是:在代理循环中,每个工具调用都带上一个基于 task_id + step_number 的幂等键。同一步骤重试多少次,幂等键不变。
重试前检查状态
一个常被忽视的模式:工具调用超时时,不要无脑重试——先检查外部状态。
工具调用超时
↓
不立即重试
↓
检查外部状态:这条数据写进去了吗?
├── 写进去了 → 视作成功,继续下一步
├── 没写进去 → 重试工具调用
└── 不确定 → 人工审核"不确定"是分布式中最难处理的状态。如果你的系统频繁进入这个分支,说明工具设计有问题——每个工具应该支持幂等查询。(调用者能通过 GET 查询操作结果)
16.9 Runtime 能力清单
至此,你可以用下面这个清单评估一个 Agent 系统是否具备生产级 Runtime:
| 能力 | 具体要求 | 没有它的后果 |
|---|---|---|
| 持久化执行 | 每步检查点,崩溃恢复 | 崩溃 = 全丢,长任务跑不了 |
| 任务队列 | 排队、调度、优先级、并发控制 | 高负载雪崩 |
| 状态外置 | Session/Execution State 存外部存储 | 进程重启丢状态 |
| 沙箱隔离 | 代码执行隔离、网络限制、资源上限 | Agent 行为影响生产环境 |
| 流式事件 | Token stream + Tool events + Progress | 用户不知道 Agent 在干嘛 |
| 熔断器 | LLM API / 工具级故障隔离 | 级联失败 |
| DLQ | 失败任务保存、诊断、重放 | 失败任务丢失 |
| 幂等机制 | 幂等键、去重表 | 重复执行造成重复副作用 |
| 超时恢复 | 任务/步骤超时后进入重试或 DLQ | 任务永远挂起 |
| 优雅关闭 | SIGTERM 后等检查点写完再退出 | 部署重启丢进度 |
| 凭据管理 | Agent 不持有原始凭据,通过 Runtime 注入 | 凭据泄露 |
16.10 主要 Runtime 平台对比
截至 2026 年 4 月,几个值得关注的 Runtime 路线:
| 平台/方案 | 执行模型 | 核心优势 | 核心劣势 | 适合场景 |
|---|---|---|---|---|
| 自建(Celery + Redis + Docker) | 自管理 Worker + 容器沙箱 | 完全可控、无供应商锁定 | 工程量大,需自行实现检查点和 DLQ | 有平台工程团队的组织 |
| Temporal + 自建沙箱 | Durable Execution 引擎 + 自定义 Activity | 工业级持久化执行、多语言 SDK | 学习曲线陡峭,沙箱需自建 | 需要强持久化保证的企业级 Agent |
| Microsoft Durable Task / Foundry Agent Service | Durable Task + 托管 Agent Runtime | 与 Azure Monitor、Entra、私网和企业治理集成深 | Azure 生态绑定,部分能力仍有预览边界 | Azure 企业、需要治理/观测一体化 |
| Inngest | Serverless Durable Functions | 零运维、内置重试和长流程编排 | 平台锁定,复杂沙箱仍需外接 | 快速上线的中小团队 |
| Restate | Durable async services / workflows | 适合把 Agent 循环、人工审批、工具调用做成可恢复服务 [3] | 需要按 Restate 模型改造服务边界 | 事件驱动、微服务团队 |
| DBOS | 数据库驱动的 durable execution | 把工作流状态持久化到数据库,适合 crashproof Agent [4] | 生态较新,团队需要接受 DBOS 编程模型 | Python/TypeScript 后端、想少运维队列 |
| Cloudflare Workflows / Agents | Workers 上的 durable workflow + Agent 通信 | 边缘部署、长流程、低运维,适合轻量 Agent [7] | 云厂商绑定,复杂原生依赖/系统调用受限 | Web/边缘 Agent、长等待任务 |
| Modal | Serverless 容器 + GPU + Sandboxes | 极快冷启动(官方称 sub-second)、弹性伸缩、GPU 友好 | 主要是容器级隔离,持久化执行需组合其他机制 | 需要 GPU 或弹性代码执行的 Agent |
| E2B | 托管 Linux VM 沙箱 | SDK 简洁、适合 Coding Agent 和代码解释器 | 外部依赖、成本,编排/检查点要另配 | Coding Agent、不可信代码执行 |
| LangGraph Platform | LangGraph 的托管部署平台 | 与 LangGraph 深度集成 | 平台绑定 LangGraph 生态 | LangGraph 用户的生产部署 |
| OpenAI Agents SDK + 自建基础设施 | 官方 SDK + 自建 Runtime | 最低抽象层、原生 Guardrails | 需要自行补齐 Runtime 能力 | 想完全控制 Runtime 的团队 |
选型没有标准答案,关键在于你愿意在 Runtime 上投入多少工程精力。一个务实判断是:durable execution 和 sandbox 通常不是同一个产品能力。Temporal、Inngest、Restate、DBOS、Cloudflare Workflows 更偏"把长流程可靠跑完";E2B、Modal、Dynamic Workers 这类方案更偏"把代码隔离跑起来"。生产系统往往需要把两类能力组合起来。
16.11 Runtime 与治理的边界
本章最后要画清一条线:Runtime 的职责到哪为止,治理的职责从哪开始。
Runtime(本章)的职责:
- 确保任务可靠完成(持久化、重试、容错)
- 在隔离环境中执行(沙箱)
- 管理任务的生命周期(排队、调度、超时)
- 提供流式事件和进度反馈
- 管理 Agent 不直接持有的凭据
治理(第 20 章)的职责:
- 定义 Agent 能调用哪些工具、访问哪些数据
- 控制操作的权限等级和审批流程
- 审计日志(谁、什么时候、做了什么)
- 合规和法规对齐
- RBAC/ABAC 权限模型
两者的关系用一句话概括:Runtime 负责把任务可靠跑完,治理负责定义它在什么权限下跑、留下什么审计记录、出了问题怎么追责。 Runtime 提供能力(能跑),治理提供约束(怎么跑)。
[Runtime 层]
- 持久化检查点
- 沙箱隔离
- 任务队列
- 熔断器
- 凭据注入(不暴露给 Agent)
↓
[Agent 执行]
↓ 调用工具
[治理层拦截]
- 权限检查(能调用吗?)
- 审批(需要人工确认吗?)
- 审计(记录这条调用)
↓ 允许
[工具执行]一个实际例子:Agent 要发一封邮件给客户。Runtime 确保 Agent 在沙箱中运行、当前步骤在执行前写好了检查点、postmaster API 有熔断器保护。治理层检查:Agent 有没有发邮件的权限?邮件内容是否需要审批(高风险操作:发邮件给外部收件人)?不管发没发出去,这条操作都要进审计日志。
两者协作,不是替代。Runtime 和治理一样都搞不定的,是第 20 章的红队测试、组织演练和事故响应。
常见故障模式
坑 1:把检查点存在 Worker 本地磁盘
典型症状:本地文件系统里明明有检查点,但 Worker 被 Kubernetes 调度到另一台机器后,恢复流程找不到任何状态。
检查点必须存在外部、共享、持久化的存储中。Redis(适合热数据、高性能)、PostgreSQL(适合需要复杂查询的检查点状态)、S3(适合大文件检查点)都可以,但不能是本地磁盘。
坑 2:没有状态恢复测试
典型症状:代码里确实写了检查点,但第一次生产崩溃后才发现检查点缺字段、版本不兼容,或者恢复后无法重建沙箱挂载。
测试持久化执行不能只测"写检查点了没",要测"崩了之后能不能从检查点恢复并产生相同的结果"。写一个 chaos test:在 Agent 跑的时候随机杀进程,然后验证恢复结果。
坑 3:把所有东西都包装成 try/except 然后吞掉
典型症状:Agent 看似没有报错,但主模型、主工具或主检索链路已经持续失败,只是被宽泛的异常处理悄悄降级了。
错误要分类处理:重试的(record + retry)、降级的(record + fallback + log warning)、致命的(record + escalate to human)。不能所有错误都用同一个 except 处理。
面试高频题
问题:解释 Durable Execution 的原理。一个运行 30 分钟的 Agent 任务中途崩溃了,你的系统如何恢复?
参考答案框架:
先讲核心原理:每完成一个步骤后写入检查点(消息历史 + 工具调用结果 + 执行状态)。崩溃后从最后检查点读取,继续执行而非从头开始。
恢复流程:重启进程 → 读检查点存储(外部存储,不是本地磁盘)→ 重建沙箱(新的、空的)→ 从外部状态存储重新挂载数据 → 从最后检查点的步骤继续执行。
关键工程点:
- 检查点必须在外部存储,不能在本机
- 沙箱是临时的,销毁后重建,外部状态单独持久化
- LLM 重放的非确定性问题:需要固定 seed + temperature=0 + 重放时检测偏差
- 不可逆操作(发邮件、扣款)之前必须有审批检查点
加分点:
- 提到状态需要区分 Session State、Execution State、External State,分开管理
- 提到重试语义:at-least-once + 幂等键。重试前检查外部状态,不无脑重试
- 提到熔断器在 LLM API 不可用时的作用:切换到备用模型而不是无限重试
- 提到测试:持久化执行需要 chaos test——随机杀进程验证恢复
参考资料
[1] Microsoft Learn, "Durable Task for AI agents", 2026. https://learn.microsoft.com/en-us/azure/durable-task/sdks/durable-task-for-ai-agents
[2] Inngest, "Durable Execution: The Key to Harnessing AI Agents in Production", Inngest Blog, 2026. https://www.inngest.com/blog/durable-execution-key-to-harnessing-ai-agents
[3] Restate, "AI Agents", Restate Documentation. https://docs.restate.dev/use-cases/ai-agents
[4] DBOS, "Durable Execution for Building Crashproof AI Agents". https://www.dbos.dev/blog/durable-execution-crashproof-ai-agents
[5] E2B, "Documentation", E2B Documentation. https://e2b.dev/docs
[6] Modal, "Sandboxes", Modal Documentation. https://modal.com/docs/guide/sandboxes
[7] Cloudflare, "Build a Durable AI Agent", Cloudflare Workflows Documentation. https://developers.cloudflare.com/workflows/get-started/durable-agents/