Skip to content

第16章:Agent Runtime —— 让 Agent 活在生产环境

很多团队第一次做 Agent 时会遇到同一个落差:demo 里 50 行代码就能跑起来,到了生产环境却卡在崩溃恢复、任务超时、工具副作用、沙箱隔离和并发控制上。这个落差不完全是模型问题,更是 Runtime 问题。

前 15 章讲的都是怎么构建 Agent——认知架构、工具调用、记忆、规划、多 Agent 协作、协议、护栏。这些是"造车"的问题。但一辆车能跑 0-100km/h 加速和在高速公路上连续跑 10 万公里不出事,是两件完全不同的事。

这一章讲的是 Agent 的"高速公路系统":持久化执行、沙箱隔离、状态管理、异步任务、流式交互、容错降级、幂等重试。理解了这些,你才知道为什么 demo 和 production 之间的 gap 不是你 prompt 写得不够好。

框架帮你构建 Agent,Runtime 帮你可靠地运行 Agent。

16.1 什么是 Agent Runtime —— 框架和 Runtime 的本质区别

核心直觉

框架是造车工厂,Runtime 是道路系统。框架决定 Agent 的行为逻辑(用什么模型、有什么工具、怎么思考),Runtime 决定 Agent 能不能在断网、崩溃、超时、并发激增的情况下还可靠运行。

框架做了什么

框架的核心职责是定义 Agent 的行为

  • 提供 Agent 循环(接收输入 → LLM 推理 → 解析动作 → 执行工具 → 观察结果 → 决定是否继续)
  • 管理工具注册和调用
  • 组织多 Agent 协作(handoff、supervisor 模式)
  • 提供 Guardrails(输入/输出/工具护栏)
  • 集成记忆和上下文管理

LangGraph、CrewAI、OpenAI Agents SDK 都在这一层。它们回答的问题是"这个 Agent 是什么、能做什么"。

Runtime 做了什么

Runtime 的核心职责是保证 Agent 可靠执行

  • 每一步后保存检查点,崩溃后从断点恢复
  • 在隔离的沙箱中执行代码和操作文件
  • 管理长任务的排队、调度、超时和重试
  • 处理 LLM API 不可用时的降级和备用方案
  • 持久化会话状态,让 Agent 跨进程重启后保持连续性
  • 提供流式事件总线,让前端实时感知 Agent 在做什么

为什么需要区分

一个最常见的错误认知:用了 LangGraph 就等于有了生产级 Runtime。LangGraph 提供了 checkpointer——但它是一个持久化接口,不是完整的 Runtime。真正的 Runtime 需要把检查点存储、沙箱生命周期、任务队列、熔断器、流式事件管线全部集成到一起。

工程上更准确的说法是:写出一个会调用工具的 Agent 循环很容易,让它连续运行几小时、处理中途重启、工具超时、LLM 限流和外部服务抖动,才是真正的产品化工作。

区分框架和 Runtime 不是学术分类,而是工程边界。框架的代码跑在你的进程里,Runtime 的职责跨进程、跨机器、跨重启。当你遇到"Agent 跑到一半崩了怎么恢复"、"怎么让 Agent 跑 2 小时的任务不丢状态"这类问题时,你需要的不是更好的框架,而是 Runtime。

16.2 Durable Execution —— 让 Agent 拥有"存档读档"能力

核心直觉

你玩 RPG 游戏时会在打 Boss 前存档。死了之后从存档点重来,不用从新手村重新跑。Durable Execution 对 Agent 做的就是这件事——每一步后自动存档,崩了从最后一步接着跑,而不是从头再来。

问题有多严重

一个运行 30 分钟的 Agent 任务,可能在任意时刻崩溃:

  • LLM API 返回 500(云服务故障,每几个月总有一次)
  • 工具调用超时(第三方 API 慢响应)
  • 进程被 OOM Killer 杀掉(Agent 上下文膨胀到几 GB)
  • 部署重启(你发了新版本,旧进程被杀)
  • 网络断开(Agent 跑在笔记本电脑上,合盖休眠)

没有持久化执行,上面任何一种情况都意味着:全部中间结果丢失,从头重跑。对于消耗了大量 token、调了十几次工具的任务,这个代价不只是时间——是钱。

检查点机制

Durable Execution 的核心原理简单到只有一句话:每完成一个步骤,把当前状态写入持久化存储

步骤 1 完成 → 写检查点
步骤 2 完成 → 写检查点
步骤 3 完成 → 写检查点
... 崩了!
重新启动 → 读最后一个检查点 → 从步骤 4 继续

这不是新概念。Temporal(前身是 Uber 的 Cadence)在 2019 年就把这个模式做成了微服务编排的基础设施。当时解决的问题是"如果订单处理中途崩了,怎么不重复扣款也不丢订单"。Agent 面临的是完全同构的问题——只是"单步操作"从调用微服务变成了调用 LLM + 执行工具。

一个最小实现

持久化检查点不需要重框架。一个 Python 装饰器就能说清楚核心逻辑:

python
import json
import hashlib
import redis
from functools import wraps
from typing import Any, Callable

r = redis.Redis(decode_responses=True)

def checkpoint(task_id: str, step_name: str):
    """一个简单的检查点装饰器——演示原理,生产环境需要更多工程化"""
    def decorator(fn: Callable):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            # 尝试读取已有检查点
            ckpt_key = f"agent:{task_id}:{step_name}"
            cached = r.get(ckpt_key)
            if cached:
                return json.loads(cached)

            # 执行步骤
            result = fn(*args, **kwargs)

            # 写入检查点
            r.set(ckpt_key, json.dumps(result), ex=86400)
            return result
        return wrapper
    return decorator

@checkpoint(task_id="task-42", step_name="fetch_issue")
def fetch_jira_issue(issue_id: str) -> dict:
    resp = requests.get(
        f"https://jira.company.com/api/issue/{issue_id}",
        timeout=30
    )
    resp.raise_for_status()
    return resp.json()

@checkpoint(task_id="task-42", step_name="summarize")
def summarize_issue(issue: dict) -> str:
    return llm_call(
        system="你是一个技术摘要助手",
        prompt=f"用 3 句话总结这个 Jira 工单:\n{json.dumps(issue)}"
    )

task_id 是幂等的关键——同一个 task_id + 同一个 step_name,无论执行多少次,结果来自同一个检查点。如果 LLM 调用成本 $0.01 而你已经为这一步付过费,从检查点读取就是纯利润。

当然,这个实现省略了很多生产必需的东西:检查点版本管理(步骤逻辑改了,旧检查点要失效)、部分失败标记(步骤失败和步骤没跑是两回事)、事务性写入(写检查点和实际执行应该是原子的)。但它们都是可解决的工程扩展,不影响核心概念。

持久化 vs 非持久化:一个对比

非持久化 Agent持久化 Agent
崩溃后恢复从头重跑(损失所有 token 成本 + 时间)从最后检查点继续
长任务(> 30min)几乎不可行原生支持
任务暂停/恢复不支持支持(检查点 = 可序列化的暂停点)
调试重跑整个任务从任意检查点回放
额外开销每步一次写操作(通常 < 10ms)

检查点应该存什么

这个问题看似简单,实则决定了你的恢复质量:

  • 最小方案:只存消息历史(对话记录)。能恢复 LLM 上下文,但工具调用的副作用(已发出的邮件、已创建的订单)无法回滚。
  • 标准方案:消息历史 + 工具调用结果 + 流程状态(当前处于哪个步骤)。大部分 Agent 任务用这个就够了。
  • 完整方案:以上 + 外部状态快照(文件系统状态、数据库行版本、外部 API 响应)。用于需要精确回放的场景(比如调试和审计)。

选择哪种方案取决于你的容错需求。如果工具操作是幂等的(16.8 节会展开),最小方案通常够用。如果工具的副作用不可逆(发邮件、扣款),至少要标准方案,并且你的恢复逻辑需要检查"这一步到底执行成功了没有"。

Agent 场景里的新问题

Durable Execution 不是 Agent 时代才出现的概念,但 2025-2026 年它重新变成热点,是因为长任务 Agent 把老问题放大了。Microsoft Durable Task for AI agents 文档明确把状态管理、检查点和分布式协调作为底层基础设施,让 Agent 代码不用自己处理这些细节 [1]。Inngest 也把 AI Agent 作为 durable execution 进入主流采用的重要驱动力之一 [2]。

也有人指出反向问题:Durable Execution 的设计假设每步是确定的——重放应该得到相同结果。但 LLM 调用天然非确定(除非 temperature=0 且所有参数固定)。在实践中,检查点重放 LLM 调用时你需要固定 seed 参数并确保模型版本一致,否则"恢复"后的推理可能与之前不同。

这里要避免一个过度推广:不是每个 Agent 都需要持久化,简单的一次性问答不值得引入这个复杂度。但对于任何运行超过几分钟、会调用多个工具、或有不可忽略 token/外部 API 成本的任务,Durable Execution 应该成为默认架构选项之一。

16.3 Sandbox —— 把 Agent 关在笼子里,而不是给它你家的钥匙

核心直觉

Agent 要执行代码、写文件、调 API、装依赖——这些操作你不敢让它在你的生产服务器上直接做。沙箱就是给 Agent 一个隔离的笼子:它可以随便折腾,但折腾的范围碰不到外面的东西。

为什么 Agent 需要沙箱

第 15 章讲了多起 Agent 删除生产数据库、外泄数据的事故。那些事故的根本原因不是护栏没做好,而是Agent 有接触生产环境的权限。护栏是软件层的防御,沙箱是基础设施层的隔离。

Agent 需要沙箱的核心理由不是"它会恶意行为"——大多数时候它只是犯错。但一个犯了错的 Agent 如果跑在你的生产环境里,它的错误就是你的灾难:

  • 一个 Coding Agent 执行了 rm -rf /(不一定是恶意的,可能是生成的清理脚本有 bug)
  • 一个数据分析 Agent 装了一个恶意 pip 包(供应链攻击)
  • 一个 Research Agent 抓取了一个恶意网页,网页里有浏览器 0-day(别笑,这是真实的安全顾虑)
  • 一个 Browser Agent 在填表单时误触了"删除所有数据"按钮

隔离度的光谱

不是所有任务都需要同等级别的隔离。从轻到重:

隔离级别实现冷启动安全边界适合场景
子进程subprocess.run()< 10ms几乎为零单次简单计算、内部脚本
Docker容器运行时1-5s中(共享内核)大多数代码执行场景
gVisor用户态内核1-5s高(系统调用拦截)多租户场景、不可信代码
microVMFirecracker50-200ms极高(硬件虚拟化)高风险操作、外部代码执行

不同路线的工程取舍

Docker 容器是最常见的方案。你启动一个临时容器,把 Agent 的代码和依赖装进去,跑完就销毁。问题是启动慢(镜像拉取 + 容器初始化需要几秒),而且共享宿主机内核——理论上存在容器逃逸风险。在实践中,这个风险对大多数企业内部场景是可以接受的。

microVM / 轻量 VM 是高隔离沙箱的一条常见路线。E2B 文档把 Sandbox 描述为按需创建的快速 Linux VM,适合让 Agent 安全执行代码和工具 [5]。Firecracker 是这类 microVM 架构中最有代表性的实现之一:每个执行环境有自己的内核,安全边界比共享宿主内核的容器更清晰。代价是资源开销和运维复杂度都更高。

gVisor 是 Google 的方案。它在用户空间实现了一个内核,拦截容器的系统调用。安全隔离比 Docker 强(系统调用被过滤),比 microVM 轻(启动更快)。如果你需要比 Docker 更安全的方案但不想引入虚拟化开销,这是一个务实的中间选择。

远程执行环境:Modal、E2B、Replit、Codesandbox 等平台提供了托管沙箱。你把代码发过去,它们在云端隔离执行,返回结果。好处是零运维负担,坏处是有外部依赖和成本。Modal 的 Sandboxes 文档明确把执行 LLM 生成代码、运行不可信代码、检出代码仓库并执行测试列为典型场景 [6]。

哪个方案适合你? 如果你是创业团队做内部工具,Docker 容器 + 合理的网络策略足够。如果你是做面向客户的 Coding Agent 产品,Firecracker microVM 是更安心的选择。如果你是金融/医疗行业,合规要求可能强制 microVM 级别的隔离。

沙箱不只是执行环境

一个完整的沙箱方案需要回答这几个问题:

  • 文件系统:Agent 能读写哪些目录?外部存储怎么挂载?
  • 网络权限:能访问哪些域名和 IP?能不能出外网?
  • 资源上限:CPU、内存、磁盘的时间/容量限制是多少?超过上限是杀进程还是限流?
  • 生命周期:沙箱创建时机(任务开始时?首次代码执行时?)、闲置多长时间后回收、任务完成后数据保留多久?
  • 凭据注入:Agent 需要的 API Key 怎么安全地注入沙箱而不暴露在日志里?

一个常见的设计模式:Ephemeral + External State

沙箱本身是临时的——任务结束就销毁。但 Agent 需要持久化的东西(代码仓库、生成的文档、数据库查询结果)存在沙箱外面。这个模式在 16.4 节展开。

python
# 伪代码:沙箱执行的典型流程
sandbox = create_sandbox(
    image="python:3.12",
    env={"API_KEY": runtime.get_secret("agent_api_key")},  # 运行时注入,不进日志
    network="isolated",  # 只能访问白名单域名
    max_runtime_seconds=300,
    max_memory_mb=512,
)

try:
    result = sandbox.execute(
        code=agent_generated_code,
        stdin="",  # 不传用户输入到沙箱
    )
    if result.exit_code != 0:
        raise SandboxExecutionError(result.stderr)
    return result.stdout
finally:
    sandbox.destroy()  # 确保沙箱被销毁,不管成功还是失败

16.4 状态管理 —— 临时的壳,持久的心

核心直觉

Agent 的执行环境(进程、沙箱、容器)是临时的——随时可能被销毁重建。但 Agent 的记忆和任务进度必须是永久的。状态管理要解决的就是一个矛盾:运行在临时环境里的 Agent,怎么在跨重启、跨环境重建后保持连续性。

三种状态

Agent 的状态分三层,每层的持久化策略不同:

状态类型内容生命周期持久化策略
Session State对话历史、工具调用日志、当前上下文单次会话需要持久化(崩溃恢复)
Execution State任务进度、中间结果、分支决策、检查点单次任务执行必须持久化(Durable Execution 的基础)
External State代码仓库、生成的文档、数据库写入跨会话、跨任务本身就是持久化的

Session State 就是你的消息数组(messages list),以及 LLM 的内部状态(如果模型支持)。它是 Agent "记得刚才发生了什么"的基础。

Execution State 是任务层面的——"我已经完成了步骤 3,下一步是步骤 4,步骤 2 的结果存在这里"。它比 Session State 更结构化,通常以一个字典或 JSON 对象存储。

External State 是 Agent 对世界产生的副作用——写了个文件、创建了个 PR、数据库里插了行数据。这些东西的持久化是业务系统本身的职责。

Ephemeral + External State 模式

分离临时的(ephemeral)和持久的(external),是 Agent 状态管理最核心的设计模式:

┌─────────────────────────────────────┐
│         Episode Runtime              │
│  ┌──────────┐  ┌──────────┐         │
│  │ Sandbox  │  │  Agent   │         │
│  │ (临时)   │  │ 进程 (临时)│        │
│  └────┬─────┘  └────┬─────┘         │
│       │              │               │
│       │   读写       │   读写        │
│       └──────┬───────┘               │
│              │                       │
└──────────────┼───────────────────────┘

       ┌───────▼────────┐
       │  External State │  ← 持久化存储
       │  (文件/DB/Blob)  │
       └────────────────┘

当 Agent 需要重启时,流程是:

  1. 创建新的沙箱(空的、全新的)
  2. 从 External State 中恢复 Agent 上次写入的数据
  3. 从检查点存储中恢复 Session State 和 Execution State
  4. Agent 继续从上次中断的地方执行

这个模式的价值在于:运行环境是牛马,用完就被杀掉。真正的状态永远在外面。 你不需要备份沙箱的磁盘镜像,不需要热迁移容器——直接把外部状态挂载到新环境,Agent 无缝继续。

一个具体的恢复流程

python
import json
from typing import Optional

class AgentRuntime:
    def __init__(self, state_store, checkpoint_store, sandbox_provider):
        self.state_store = state_store
        self.checkpoint_store = checkpoint_store
        self.sandbox_provider = sandbox_provider

    async def resume_or_start(self, task_id: str) -> dict:
        """恢复已有任务或启动新任务"""

        # 1. 尝试读取执行状态
        exec_state = await self.checkpoint_store.get(f"{task_id}:exec_state")

        if exec_state:
            # 恢复模式
            session_state = await self.checkpoint_store.get(f"{task_id}:session")
            messages = json.loads(session_state)

            # 重新挂载外部状态
            external_refs = json.loads(exec_state).get("external_refs", {})

            # 从上次中断的步骤继续
            last_step = json.loads(exec_state)["current_step"]

        else:
            # 全新任务
            messages = [{"role": "user", "content": task_input}]
            external_refs = {}
            last_step = 0

        # 2. 创建全新沙箱
        sandbox = await self.sandbox_provider.create()
        sandbox.mount_external_state(external_refs)

        # 3. 恢复执行
        return await self._execute_from(
            task_id=task_id,
            sandbox=sandbox,
            messages=messages,
            start_step=last_step,
        )

    async def _execute_from(self, task_id, sandbox, messages, start_step):
        """从指定步骤开始执行 Agent 循环"""
        for step_num, step in enumerate(self.plan[start_step:], start=start_step):
            # 执行步骤
            result = await step.execute(sandbox, messages)

            # 写入检查点
            await self.checkpoint_store.set(
                f"{task_id}:exec_state",
                json.dumps({"current_step": step_num + 1, "plan": self.plan})
            )
            await self.checkpoint_store.set(
                f"{task_id}:session",
                json.dumps(messages)
            )

        return {"status": "completed", "messages": messages}

这个流程图展示了恢复的全过程:

社区踩坑实录

坑 1:把 Session State 和 Execution State 混在一起存

「我们在消息历史里同时存了对话内容和任务步骤标记,结果一压缩上下文,步骤标记全乱了」——Reddit r/ai_agents 上的开发者反馈。

教训:Session State(对话)和 Execution State(任务进度)分开存储。上下文压缩只操作 Session State,不影响 Execution State 的判断逻辑。

坑 2:沙箱销毁前忘了持久化

「我们设了 30 分钟 idel timeout,Agent 跑到第 29 分钟时沙箱被回收了,中间结果全丢了」——开发者博客上的经验分享。

教训:不要依赖沙箱的持久性。每完成一个有副作用的步骤,立即写 External State。不要等到任务结束才一起存。

坑 3:检查点恢复后 LLM 行为不一致

「从检查点恢复后,同样的上下文,模型给出了和崩溃前不一样的下一步决策,导致后续工具调用进入了一个无效循环」——HN 讨论。

这是 Durable Execution 应用于 LLM Agent 的根本矛盾。传统 Durable Execution 假设重放是确定的——同样的输入 → 同样的代码路径。LLM 调用不满足这个假设。实际缓解方法:固定 seed + temperature=0 + 重放时检测偏差(如果恢复后的工具调用与之前记录的预期不同,立即标记人工审核)。

16.5 后台任务与异步生命周期

核心直觉

Agent 任务不是 HTTP 请求——它可能跑几分钟到几小时。你不能让用户一直盯着浏览器等。后台任务系统把 Agent 变成异步的:用户提交任务 → 系统排队执行 → 完成后通知用户。

任务生命周期

一个 Agent 后台任务的完整生命周期:

几个关键的工程点:

排队:任务提交后进入队列。队列解耦了"接收请求"和"执行任务"——即使所有 Worker 都在忙,新任务也不会丢。Celery、BullMQ、Google Cloud Tasks 都是成熟的队列方案。

调度:队列里的任务按优先级分配 Worker。高优先级队列(用户实时交互的任务)和低优先级队列(批量处理任务)分开。

回调:任务完成后,调用方如何知道?三种常见模式:

模式做法适合场景缺点
轮询客户端每隔 N 秒查一次状态简单场景、内部工具浪费请求、延迟不精确
Webhook任务完成后 POST 到回调 URL服务端集成需要处理回调失败(重试/死信)
消息推送通过 WebSocket/SSE 实时推送前端实时 UI需要客户端在线

一个后台任务的实现骨架

python
from celery import Celery
from typing import Optional
import json

app = Celery("agent_worker", broker="redis://localhost:6379/0")

class AgentTask:
    def __init__(self, task_id: str, input_data: dict, webhook_url: Optional[str] = None):
        self.task_id = task_id
        self.input = input_data
        self.webhook_url = webhook_url
        self.checkpoint_store = CheckpointStore()

    def run(self):
        """任务主循环——可以被 Celery worker 调用"""
        # 尝试恢复检查点
        state = self.checkpoint_store.get(f"{self.task_id}:state")
        step_num = state["current_step"] if state else 0

        agent = AgentLoop(checkpoint_store=self.checkpoint_store)
        try:
            result = agent.run_from_step(
                task_id=self.task_id,
                start_step=step_num,
                input=self.input,
                max_steps=100,
                timeout_seconds=3600,
            )
            self._on_complete(result)
        except RetryableError as e:
            self._on_retryable_error(e)
        except FatalError as e:
            self._on_fatal_error(e)

    def _on_complete(self, result):
        if self.webhook_url:
            requests.post(self.webhook_url, json={
                "task_id": self.task_id,
                "status": "completed",
                "result": result,
            })

    def _on_retryable_error(self, error):
        # 指数退避重试:第 1 次等 10s,第 2 次等 40s,第 3 次等 90s
        retry_count = error.context.get("retry_count", 0)
        delay = min(10 * (2 ** retry_count), 300)  # 最多等 5 分钟
        raise self.retry(countdown=delay)

@app.task(bind=True, max_retries=3, acks_late=True)
def run_agent_task(self, task_id: str, input_data: dict, webhook_url: str = None):
    """Celery 任务包装器"""
    task = AgentTask(task_id, input_data, webhook_url)
    task.run()

关键设计点:

  • acks_late=True:Celery 在任务完成后再确认消费。如果 Worker 在执行中途崩溃,消息回到队列,另一个 Worker 重新执行——配合检查点,不会重复已完成的工作。
  • max_retries=3:最多重试 3 次,防止无限重试消耗资源。
  • webhook_url:调用方注册的回调地址,任务完成时通知。这是解耦的关键——调用方不需要轮询。

多 Worker 与并发

一个生产级的 Agent Runtime 需要支持多个 Worker 并行处理任务。关键考量:

  • Worker 数量:受 LLM API 速率限制约束。如果你每分钟 API 额度是 500 次,单个 Agent 任务平均每分钟调 10 次 LLM,那最多跑 50 个并发任务(实际上要留 buffer,建议不超过额度的 70%)。
  • 粘性路由:同一个 task 的重试(检查点恢复后)不需要回到同一个 Worker——这也是为什么检查点必须存在外部存储(Redis、Postgres)而不是 Worker 本地内存。
  • 优雅关闭:收到 SIGTERM 时,Worker 应该等待当前任务完成检查点写入再退出,避免丢失进度。

16.6 Agent 流式交互架构

核心直觉

一个运行中的 Agent 会产生三种不同性质的流:模型逐 token 吐字、工具调用开始/完成/失败事件、后台任务的进度更新。这三种流服务于不同的消费者,不应该被混在一起。

三种流

流类型内容消费者传输方式关键要求
Token StreamLLM 推理的逐 token 输出前端渲染文字SSE低延迟(< 100ms 首 token)
Tool Event Stream工具调用开始/进度/结果前端展示"Agent 在做什么"SSE结构化、可折叠、敏感信息过滤
Task Progress Stream任务状态变更、检查点写入、剩余步骤估计前端进度条 + 状态栏SSE / WebSocket低频、可靠、断线重连

把这三者分开的原因很简单:不要让 token stream 的速度被 tool event 阻塞。LLM 正在流畅吐 token 的时候,一个工具调用完成不应该干扰文字渲染。反过来,工具调用的详细信息(含参数、返回值)不应该被塞进 token stream 里让前端解析——它应该是独立的结构化事件。

SSE vs WebSocket vs Webhook 的选择

这个问题在社区被反复讨论,但其实决策没那么难:

SSE(Server-Sent Events):服务端单向推送流。HTTP 原生支持,天然支持断线重连(Last-Event-ID),负载均衡器友好。用于 Token Stream 和 Tool Event Stream——信息流是单向的(服务端 → 客户端),SSE 是比 WebSocket 更简单、更符合 HTTP 语义的选择。

WebSocket:双向通信。适合需要客户端主动发消息的场景——比如用户在 Agent 中途插入新指令、点击"暂停"或"纠正方向"。WebSocket 的缺点是连接管理比 SSE 复杂(没有原生重连、负载均衡器配置更麻烦)。

Webhook:任务完成时的回调(HTTP POST)。不是"流"而是"事件回调",适合服务端集成——另一个服务在等 Agent 任务完成。

一个务实的组合:Token Stream 和 Tool Events 用 SSE,用户交互用 WebSocket,任务完成通知用 Webhook。 不要把一切需求都往 WebSocket 里塞——它为双向设计,但大部分场景下你是单向推送。

前端展示策略

Agent 的输出和传统 Chatbot 有本质区别——用户看到的不是单纯的对话,而是一个在执行任务的自主系统。UX 设计几个关键点:

  • 折叠工具日志:用户不需要看到 search_issues("project=PROJ max_stock=50") 这种内部调用。默认折叠,展开时也过滤掉敏感信息(API Key、内部 IP)。
  • 进度条但不承诺准确时间:Agent 自己也不知道还要跑几步。进度条应该是"已完成步骤数 / 预计最大步骤数"或"已消耗时间 / 超时时间"——给出锚点,但不虚报。
  • 可中断:取消按钮应该在任何时候都可用。取消不等于立即停止——Agent 在写入当前检查点后再停止,而不是直接杀进程。
  • 中间产物可查看:Agent 跑到一半产生的文件、数据、分析结果,应该可以让用户预览。这不仅建立信任,也让用户在 Agent 跑偏时能提前干预。

16.7 容错与降级策略

核心直觉

在生产环境,东西一定会坏。容错不是"怎么不出错",而是"出错了之后怎么让系统继续运行"。Agent Runtime 要处理三类故障:LLM API 故障、工具故障、自身进程故障。

LLM API 不可用

这是最常见的故障。LLM API 不可用的原因很多:供应商故障、速率限制、你的账户欠费、网络分区。故障模式不是"它坏了",而是"它有概率坏"——你需要在工程上假设它随时可能不可用。

降级策略分层

策略做法成本质量损失
备用模型主模型不可用时切换到备用模型无额外成本(按需切换)取决于备用模型能力
缓存响应相同/相似 prompt 返回缓存结果低(缓存存储成本)可能不适用(任务特异性)
排队稍后遇到 rate limit 时入队列延迟执行延迟增加
降级到简单模式跳过复杂推理,用规则引擎完成质量下降,但任务能完成
人工接管兜底,路由到人工处理高(人力成本)质量最高但不可扩展

一个实用的熔断器实现:

python
import time
import threading
from dataclasses import dataclass, field

@dataclass
class CircuitBreaker:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0  # 30 秒后尝试半开
    half_open_max: int = 1  # 半开状态允许的试探请求数

    _failure_count: int = field(default=0, init=False)
    _last_failure_time: float = field(default=0, init=False)
    _state: str = field(default="closed", init=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False)

    def call(self, fn, *args, **kwargs):
        with self._lock:
            if self._state == "open":
                if time.monotonic() - self._last_failure_time > self.recovery_timeout:
                    self._state = "half_open"
                else:
                    raise CircuitBreakerOpenError("熔断器打开,拒绝调用")

        try:
            result = fn(*args, **kwargs)
            with self._lock:
                if self._state == "half_open":
                    self._state = "closed"
                    self._failure_count = 0
            return result
        except Exception:
            with self._lock:
                self._failure_count += 1
                self._last_failure_time = time.monotonic()
                if self._failure_count >= self.failure_threshold:
                    self._state = "open"
            raise

# 使用:用熔断器包装 LLM API 调用
llm_cb = CircuitBreaker(failure_threshold=3, recovery_timeout=60)

try:
    response = llm_cb.call(openai.chat.completions.create, model="gpt-4.1", messages=messages)
except CircuitBreakerOpenError:
    # 熔断器打开——用备用模型
    response = anthropic.messages.create(model="claude-sonnet-4-20250514", messages=messages)

熔断器有三个状态:closed(正常,请求通过)、open(连续失败超过阈值,直接拒绝)、half-open(open 一段时间后允许少次试探,成功则关闭)。

工具级容错

工具调用有比 LLM API 更复杂的故障模式:

  • 超时:外部 API 慢响应
  • 部分失败:批量操作中部分成功部分失败
  • 非确定性错误:400(你的请求有问题)vs 429(对方限流)vs 500(对方坏了)——不同的错误码应该有不同的重试策略
  • 工具间影响:工具 A 的失败不应该让整个任务崩溃,应该让 Agent 知道 A 失败了并自己决定下一步
python
from enum import Enum

class ToolErrorType(Enum):
    RETRYABLE = "retryable"      # 429, 503——等一下再试
    FATAL = "fatal"              # 400, 401——不要重试,重试也没用
    DEGRADED = "degraded"        # 工具仍然能用但质量下降

def handle_tool_error(error: Exception, tool_name: str, attempt: int) -> ToolErrorType:
    if isinstance(error, RateLimitError):
        return ToolErrorType.RETRYABLE
    if isinstance(error, AuthenticationError):
        return ToolErrorType.FATAL
    if isinstance(error, TimeoutError) and attempt < 3:
        return ToolErrorType.RETRYABLE
    if isinstance(error, TimeoutError):
        return ToolErrorType.DEGRADED
    return ToolErrorType.FATAL

死信队列(DLQ)与任务重放

有些任务重试了 N 次仍然失败——它们就是"死了"。但"死了"不等于"丢了"。死信队列的作用是:把失败的任务保存到可诊断的存储中,而不是直接丢弃。

进入 DLQ 的任务带完整上下文:输入、所有检查点、最后一次错误、重试历史。运维人员可以从 DLQ 中取出任务,诊断根因,修复后重新提回任务队列。

running → failed → retry (3 次) → 仍失败 → DLQ

                                    运维诊断

                                    修复根因

                                    重新入队列 → running → completed

部分失败处理

Agent 跑到第 7 步失败了,前 6 步的结果怎么处理?这个问题的答案不是技术性的,是业务性的:

  • 如果前 6 步都是可逆的(读操作、分析)——保留这些结果,从第 7 步重试
  • 如果前 6 步包含不可逆操作(发了邮件、扣了款)——你需要决定是回滚还是继续。回滚也不是技术问题——"回滚一封已发出的邮件"不存在这个操作,你只能发一封更正邮件
  • 如果步骤之间有依赖——第 3 步的输出被第 5 步用到了,重试第 7 步可能需要重新计算上游步骤

社区最佳实践:不可逆操作(发邮件、写数据库、HTTP POST)要显式标记,Agent 计划里这些操作之前必须有审批检查点。 这不是 Runtime 能自动搞定的——它需要在 Agent 设计时就考虑进去。

16.8 幂等性与重试语义

核心直觉

重试机制最大的陷阱:你不知道上一次操作到底成功了没有。HTTP 超时不等于"没收到"——请求可能在网络上被成功处理了,只是响应没回来。幂等性的核心思想是:重复执行和只执行一次效果一样。

三种交付语义

语义含义实现方式代价
at-most-once最多执行一次,丢了就丢了无重试简单,但不可靠
at-least-once至少执行一次,可能重复重试直到确认成功需要幂等处理
exactly-once精确执行一次幂等键 + 去重 + 事务理论上不可实现(分布式系统定理),工程上可以近似

at-most-once 适合纯读操作——查数据库没查到,重查一次没有任何副作用。

at-least-once 是 Agent Runtime 的默认模式。检查点 + 重试 = 至少一次语义。你需要确保所有有副作用的工具操作是幂等的。

exactly-once 在实践中,通过幂等键可以实现近似 exactly-once。

幂等键模式

python
import uuid

def safe_send_email(recipient: str, subject: str, body: str) -> str:
    """幂等的邮件发送——同一个 idempotency_key 不会发两次"""

    # 生成或使用已有的幂等键
    idempotency_key = f"email:{hashlib.sha256(f'{recipient}:{subject}'.encode()).hexdigest()[:16]}"

    # 检查去重表
    existing = db.query("SELECT status FROM dedup_table WHERE key = ?", idempotency_key)
    if existing and existing["status"] == "sent":
        return {"status": "already_sent", "idempotency_key": idempotency_key}

    # 实际发送
    result = sendgrid.send(recipient=recipient, subject=subject, body=body)

    # 记录到去重表
    db.execute(
        "INSERT INTO dedup_table (key, status, result, created_at) VALUES (?, 'sent', ?, NOW())",
        idempotency_key, json.dumps(result)
    )

    return {"status": "sent", "idempotency_key": idempotency_key}

幂等键不一定要随业务逻辑生成。更简单的做法是:在代理循环中,每个工具调用都带上一个基于 task_id + step_number 的幂等键。同一步骤重试多少次,幂等键不变。

重试前检查状态

一个常被忽视的模式:工具调用超时时,不要无脑重试——先检查外部状态。

工具调用超时

不立即重试

检查外部状态:这条数据写进去了吗?
   ├── 写进去了 → 视作成功,继续下一步
   ├── 没写进去 → 重试工具调用
   └── 不确定 → 人工审核

"不确定"是分布式中最难处理的状态。如果你的系统频繁进入这个分支,说明工具设计有问题——每个工具应该支持幂等查询。(调用者能通过 GET 查询操作结果)

16.9 Runtime 能力清单

至此,你可以用下面这个清单评估一个 Agent 系统是否具备生产级 Runtime:

能力具体要求没有它的后果
持久化执行每步检查点,崩溃恢复崩溃 = 全丢,长任务跑不了
任务队列排队、调度、优先级、并发控制高负载雪崩
状态外置Session/Execution State 存外部存储进程重启丢状态
沙箱隔离代码执行隔离、网络限制、资源上限Agent 行为影响生产环境
流式事件Token stream + Tool events + Progress用户不知道 Agent 在干嘛
熔断器LLM API / 工具级故障隔离级联失败
DLQ失败任务保存、诊断、重放失败任务丢失
幂等机制幂等键、去重表重复执行造成重复副作用
超时恢复任务/步骤超时后进入重试或 DLQ任务永远挂起
优雅关闭SIGTERM 后等检查点写完再退出部署重启丢进度
凭据管理Agent 不持有原始凭据,通过 Runtime 注入凭据泄露

16.10 主要 Runtime 平台对比

截至 2026 年 4 月,几个值得关注的 Runtime 路线:

平台/方案执行模型核心优势核心劣势适合场景
自建(Celery + Redis + Docker)自管理 Worker + 容器沙箱完全可控、无供应商锁定工程量大,需自行实现检查点和 DLQ有平台工程团队的组织
Temporal + 自建沙箱Durable Execution 引擎 + 自定义 Activity工业级持久化执行、多语言 SDK学习曲线陡峭,沙箱需自建需要强持久化保证的企业级 Agent
Microsoft Durable Task / Foundry Agent ServiceDurable Task + 托管 Agent Runtime与 Azure Monitor、Entra、私网和企业治理集成深Azure 生态绑定,部分能力仍有预览边界Azure 企业、需要治理/观测一体化
InngestServerless Durable Functions零运维、内置重试和长流程编排平台锁定,复杂沙箱仍需外接快速上线的中小团队
RestateDurable async services / workflows适合把 Agent 循环、人工审批、工具调用做成可恢复服务 [3]需要按 Restate 模型改造服务边界事件驱动、微服务团队
DBOS数据库驱动的 durable execution把工作流状态持久化到数据库,适合 crashproof Agent [4]生态较新,团队需要接受 DBOS 编程模型Python/TypeScript 后端、想少运维队列
Cloudflare Workflows / AgentsWorkers 上的 durable workflow + Agent 通信边缘部署、长流程、低运维,适合轻量 Agent [7]云厂商绑定,复杂原生依赖/系统调用受限Web/边缘 Agent、长等待任务
ModalServerless 容器 + GPU + Sandboxes极快冷启动(官方称 sub-second)、弹性伸缩、GPU 友好主要是容器级隔离,持久化执行需组合其他机制需要 GPU 或弹性代码执行的 Agent
E2B托管 Linux VM 沙箱SDK 简洁、适合 Coding Agent 和代码解释器外部依赖、成本,编排/检查点要另配Coding Agent、不可信代码执行
LangGraph PlatformLangGraph 的托管部署平台与 LangGraph 深度集成平台绑定 LangGraph 生态LangGraph 用户的生产部署
OpenAI Agents SDK + 自建基础设施官方 SDK + 自建 Runtime最低抽象层、原生 Guardrails需要自行补齐 Runtime 能力想完全控制 Runtime 的团队

选型没有标准答案,关键在于你愿意在 Runtime 上投入多少工程精力。一个务实判断是:durable execution 和 sandbox 通常不是同一个产品能力。Temporal、Inngest、Restate、DBOS、Cloudflare Workflows 更偏"把长流程可靠跑完";E2B、Modal、Dynamic Workers 这类方案更偏"把代码隔离跑起来"。生产系统往往需要把两类能力组合起来。

16.11 Runtime 与治理的边界

本章最后要画清一条线:Runtime 的职责到哪为止,治理的职责从哪开始。

Runtime(本章)的职责:

  • 确保任务可靠完成(持久化、重试、容错)
  • 在隔离环境中执行(沙箱)
  • 管理任务的生命周期(排队、调度、超时)
  • 提供流式事件和进度反馈
  • 管理 Agent 不直接持有的凭据

治理(第 20 章)的职责:

  • 定义 Agent 能调用哪些工具、访问哪些数据
  • 控制操作的权限等级和审批流程
  • 审计日志(谁、什么时候、做了什么)
  • 合规和法规对齐
  • RBAC/ABAC 权限模型

两者的关系用一句话概括:Runtime 负责把任务可靠跑完,治理负责定义它在什么权限下跑、留下什么审计记录、出了问题怎么追责。 Runtime 提供能力(能跑),治理提供约束(怎么跑)。

        [Runtime 层]
        - 持久化检查点
        - 沙箱隔离
        - 任务队列
        - 熔断器
        - 凭据注入(不暴露给 Agent)

        [Agent 执行]
               ↓ 调用工具
        [治理层拦截]
        - 权限检查(能调用吗?)
        - 审批(需要人工确认吗?)
        - 审计(记录这条调用)
               ↓ 允许
        [工具执行]

一个实际例子:Agent 要发一封邮件给客户。Runtime 确保 Agent 在沙箱中运行、当前步骤在执行前写好了检查点、postmaster API 有熔断器保护。治理层检查:Agent 有没有发邮件的权限?邮件内容是否需要审批(高风险操作:发邮件给外部收件人)?不管发没发出去,这条操作都要进审计日志。

两者协作,不是替代。Runtime 和治理一样都搞不定的,是第 20 章的红队测试、组织演练和事故响应。

常见故障模式

坑 1:把检查点存在 Worker 本地磁盘

典型症状:本地文件系统里明明有检查点,但 Worker 被 Kubernetes 调度到另一台机器后,恢复流程找不到任何状态。

检查点必须存在外部、共享、持久化的存储中。Redis(适合热数据、高性能)、PostgreSQL(适合需要复杂查询的检查点状态)、S3(适合大文件检查点)都可以,但不能是本地磁盘。

坑 2:没有状态恢复测试

典型症状:代码里确实写了检查点,但第一次生产崩溃后才发现检查点缺字段、版本不兼容,或者恢复后无法重建沙箱挂载。

测试持久化执行不能只测"写检查点了没",要测"崩了之后能不能从检查点恢复并产生相同的结果"。写一个 chaos test:在 Agent 跑的时候随机杀进程,然后验证恢复结果。

坑 3:把所有东西都包装成 try/except 然后吞掉

典型症状:Agent 看似没有报错,但主模型、主工具或主检索链路已经持续失败,只是被宽泛的异常处理悄悄降级了。

错误要分类处理:重试的(record + retry)、降级的(record + fallback + log warning)、致命的(record + escalate to human)。不能所有错误都用同一个 except 处理。

面试高频题

问题:解释 Durable Execution 的原理。一个运行 30 分钟的 Agent 任务中途崩溃了,你的系统如何恢复?

参考答案框架:

  1. 先讲核心原理:每完成一个步骤后写入检查点(消息历史 + 工具调用结果 + 执行状态)。崩溃后从最后检查点读取,继续执行而非从头开始。

  2. 恢复流程:重启进程 → 读检查点存储(外部存储,不是本地磁盘)→ 重建沙箱(新的、空的)→ 从外部状态存储重新挂载数据 → 从最后检查点的步骤继续执行。

  3. 关键工程点:

    • 检查点必须在外部存储,不能在本机
    • 沙箱是临时的,销毁后重建,外部状态单独持久化
    • LLM 重放的非确定性问题:需要固定 seed + temperature=0 + 重放时检测偏差
    • 不可逆操作(发邮件、扣款)之前必须有审批检查点

加分点:

  • 提到状态需要区分 Session State、Execution State、External State,分开管理
  • 提到重试语义:at-least-once + 幂等键。重试前检查外部状态,不无脑重试
  • 提到熔断器在 LLM API 不可用时的作用:切换到备用模型而不是无限重试
  • 提到测试:持久化执行需要 chaos test——随机杀进程验证恢复

参考资料

[1] Microsoft Learn, "Durable Task for AI agents", 2026. https://learn.microsoft.com/en-us/azure/durable-task/sdks/durable-task-for-ai-agents

[2] Inngest, "Durable Execution: The Key to Harnessing AI Agents in Production", Inngest Blog, 2026. https://www.inngest.com/blog/durable-execution-key-to-harnessing-ai-agents

[3] Restate, "AI Agents", Restate Documentation. https://docs.restate.dev/use-cases/ai-agents

[4] DBOS, "Durable Execution for Building Crashproof AI Agents". https://www.dbos.dev/blog/durable-execution-crashproof-ai-agents

[5] E2B, "Documentation", E2B Documentation. https://e2b.dev/docs

[6] Modal, "Sandboxes", Modal Documentation. https://modal.com/docs/guide/sandboxes

[7] Cloudflare, "Build a Durable AI Agent", Cloudflare Workflows Documentation. https://developers.cloudflare.com/workflows/get-started/durable-agents/