Skip to content

第25章:从零构建一个生产级 Agent(不依赖框架)

你已经读完了前 24 章,手边有 LangGraph、CrewAI、OpenAI SDK、smolagents 可以选。那么为什么还要从零写一个 Agent?

因为框架封装了复杂性,也隐藏了复杂性。当框架表现异常时——工具没有被调用、记忆没有被更新、某个步骤莫名循环——你需要知道框架底层在做什么,才能诊断问题。更关键的是,很多生产级 Agent 需要定制检查点粒度、自定义流式协议、嵌入已有的服务架构,这些需求用框架实现往往比自己写更麻烦。

Anthropic 的官方文档有一句话值得引用 [1]:

For many LLM interactions, the overhead of a framework isn't worth it. Start with direct API calls, add complexity only when needed.

这不是反对使用框架,而是说直接 API 调用是理解底层机制的最短路径。你搞清楚了没有框架时要做什么,框架才能真正成为你的工具,而不是你依赖的黑盒。

这一章从最小 ReAct 循环开始,逐步叠加记忆、工具层、Guardrails、可观测性、持久化、流式交互,最后专门讲 Coding Agent 的几个核心架构模式。代码以可落地的教学片段为主,真实生产还需要接入你的模型、工具、权限、审计和部署环境。读完这章,你应该能从一个空文件开始,理解一个生产级 Agent 的主要部件如何拼起来。

25.1 核心循环

核心直觉

所有 Agent,不论多复杂,底层都是同一个 while 循环:接收指令 → LLM 推理 → 解析动作 → 执行工具 → 把结果还给 LLM → 判断是否停止。框架做的事情,是把这个循环包装成更高级的抽象——图节点、任务队列、Role 定义。

把这个循环弄清楚,你就看穿了所有框架。

循环有两个重要边界:

  • 入口:第一条用户消息进入消息历史
  • 出口stop_reason 变成 end_turn(任务完成),或者达到最大轮次/Token 上限(强制停止)

循环里的每一次 LLM 调用,都会把完整的消息历史发给模型。这就是为什么 Agent 的成本会随着对话轮数线性增长——每一轮的输入 Token 数都比上一轮多。

最小可运行实现

下面是一个 80 行的 Agent,没有任何依赖,只用 anthropic SDK:

python
import json
import anthropic

client = anthropic.Anthropic()

# 工具定义(JSON Schema)
TOOLS = [
    {
        "name": "search_web",
        "description": "搜索互联网获取最新信息。适合查询实时数据、新闻、产品信息。",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "搜索关键词,尽量具体"
                }
            },
            "required": ["query"]
        }
    },
    {
        "name": "calculate",
        "description": "执行数学计算。输入 Python 表达式,返回计算结果。",
        "input_schema": {
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "合法的 Python 数学表达式,如 '2 ** 10' 或 '(100 - 30) / 100'"
                }
            },
            "required": ["expression"]
        }
    }
]

# 工具实现
def search_web(query: str) -> str:
    # 生产中这里接入 Tavily / Bing / Brave Search API
    return f"[模拟搜索结果] 关于'{query}'的搜索结果:找到 3 条相关内容。"

import ast
import operator

ALLOWED_MATH_OPERATORS = {
    ast.Add: operator.add,
    ast.Sub: operator.sub,
    ast.Mult: operator.mul,
    ast.Div: operator.truediv,
    ast.Pow: operator.pow,
    ast.USub: operator.neg,
}

def _eval_math_node(node):
    if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
        return node.value
    if isinstance(node, ast.BinOp) and type(node.op) in ALLOWED_MATH_OPERATORS:
        return ALLOWED_MATH_OPERATORS[type(node.op)](
            _eval_math_node(node.left),
            _eval_math_node(node.right),
        )
    if isinstance(node, ast.UnaryOp) and type(node.op) in ALLOWED_MATH_OPERATORS:
        return ALLOWED_MATH_OPERATORS[type(node.op)](_eval_math_node(node.operand))
    raise ValueError("只允许数字和基础数学运算")

def calculate(expression: str) -> str:
    try:
        parsed = ast.parse(expression, mode="eval")
        result = _eval_math_node(parsed.body)
        return str(result)
    except Exception as e:
        return f"计算错误: {e}"

TOOL_REGISTRY = {
    "search_web": search_web,
    "calculate": calculate,
}

def run_agent(user_message: str, max_turns: int = 10) -> str:
    """最小 ReAct Agent 主循环"""
    messages = [{"role": "user", "content": user_message}]

    for turn in range(max_turns):
        response = client.messages.create(
            model="<your-claude-model>",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        )

        # 把 assistant 响应追加到历史
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason == "end_turn":
            # 提取最终文本
            return next(
                (block.text for block in response.content if block.type == "text"),
                ""
            )

        if response.stop_reason != "tool_use":
            return f"[意外停止: {response.stop_reason}]"

        # 处理工具调用
        tool_results = []
        for block in response.content:
            if block.type != "tool_use":
                continue

            tool_fn = TOOL_REGISTRY.get(block.name)
            if tool_fn is None:
                result = f"错误:未知工具 '{block.name}'"
            else:
                try:
                    result = tool_fn(**block.input)
                except Exception as e:
                    result = f"工具执行失败: {e}"

            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": result,
            })

        messages.append({"role": "user", "content": tool_results})

    return "[达到最大轮次,任务未完成]"


if __name__ == "__main__":
    answer = run_agent("1024 的平方根是多少?再搜索一下最新的 Python 版本")
    print(answer)

这 80 行代码包含了一个 Agent 的完整骨架。接下来每一节都是在这个骨架上叠加能力。

25.2 添加记忆

核心直觉

messages 列表本身就是短期记忆——它记录了当前会话里发生的一切。问题是这个列表会无限增长,而 LLM 的上下文窗口是有限的。长任务运行一段时间后,输入 Token 数会逼近上限,成本也会线性飙升。

记忆系统要解决两个问题:在窗口内放什么,以及超出窗口时怎么处理

上下文窗口管理

最简单的截断策略是保留最近 N 条消息。但这会丢失任务上下文——Agent 在第 3 轮拿到的关键信息,到第 20 轮已经被截掉了。

更好的做法是摘要压缩(Compaction):当消息列表的 Token 数接近上限时,把历史消息压缩成一段摘要,保留关键决策和发现,丢弃冗余的工具调用详情。

python
def count_tokens(messages: list, tools: list | None = None) -> int:
    """
    估算消息列表的 token 数。
    生产环境优先使用模型厂商的 count_tokens / usage API;
    这里用字符数近似,适合触发 compaction 的粗略阈值,不适合计费。
    """
    text = []
    for msg in messages:
        content = msg["content"]
        if isinstance(content, str):
            text.append(content)
        elif isinstance(content, list):
            for block in content:
                if isinstance(block, dict):
                    text.append(str(block.get("text") or block.get("content") or block))
                else:
                    text.append(str(block))
    if tools:
        text.append(str(tools))
    return sum(len(part) for part in text) // 4

def compact_messages(messages: list, keep_last: int = 4) -> list:
    """
    当消息列表过长时,压缩历史部分。
    保留最开始的系统消息 + 最近 keep_last 条消息。
    中间部分用 LLM 摘要替换。
    """
    if len(messages) <= keep_last + 1:
        return messages

    # 分离出需要压缩的历史部分
    history_to_compress = messages[:-keep_last]
    recent_messages = messages[-keep_last:]

    # 用 LLM 生成历史摘要
    summary_prompt = "请总结以下对话历史中的关键信息、重要发现和已完成的步骤(100字以内):\n\n"
    for msg in history_to_compress:
        if isinstance(msg["content"], str):
            summary_prompt += f"{msg['role']}: {msg['content']}\n"

    summary_response = client.messages.create(
        model="<your-small-claude-model>",  # 用小模型做摘要,省钱
        max_tokens=200,
        messages=[{"role": "user", "content": summary_prompt}]
    )
    summary = summary_response.content[0].text

    # 重建消息列表:摘要 + 最近消息
    compressed = [
        {
            "role": "user",
            "content": f"[对话历史摘要] {summary}"
        },
        {
            "role": "assistant",
            "content": "好的,我了解了之前的进展。"
        }
    ] + recent_messages

    return compressed

# 在主循环里集成:
def run_agent_with_memory(user_message: str, max_turns: int = 20) -> str:
    messages = [{"role": "user", "content": user_message}]
    TOKEN_LIMIT = 150_000  # 留出 50k 给输出

    for turn in range(max_turns):
        # 检查是否需要压缩
        if count_tokens(messages) > TOKEN_LIMIT:
            messages = compact_messages(messages)

        response = client.messages.create(
            model="<your-claude-model>",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        )
        # ... 余下和前面相同

跨会话长期记忆

短期记忆只在当前会话里有效。跨会话的记忆需要外部存储。最简单的实现是 SQLite:

python
import sqlite3
import json
from datetime import datetime

class AgentMemory:
    def __init__(self, db_path: str = "agent_memory.db"):
        self.conn = sqlite3.connect(db_path)
        self._init_schema()

    def _init_schema(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS memories (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id TEXT NOT NULL,
                content TEXT NOT NULL,
                importance INTEGER DEFAULT 5,
                created_at TEXT NOT NULL
            )
        """)
        self.conn.commit()

    def save(self, session_id: str, content: str, importance: int = 5):
        self.conn.execute(
            "INSERT INTO memories (session_id, content, importance, created_at) VALUES (?, ?, ?, ?)",
            (session_id, content, importance, datetime.utcnow().isoformat())
        )
        self.conn.commit()

    def load_recent(self, session_id: str, limit: int = 10) -> list[str]:
        cursor = self.conn.execute(
            "SELECT content FROM memories WHERE session_id = ? ORDER BY importance DESC, created_at DESC LIMIT ?",
            (session_id, limit)
        )
        return [row[0] for row in cursor.fetchall()]

长期记忆的关键问题是什么该记。不是所有工具调用结果都值得存——一次失败的搜索不需要记,但用户确认过的一个关键事实("用户偏好使用 PostgreSQL")值得记。简单的规则:只记录跨任务有价值的内容,工具调用的中间结果不要存

生产里的长期记忆还要加三条边界:第一,key 里要包含 tenant/user scope,避免不同租户或角色之间串记忆;第二,PII、secret、访问 token、完整工具参数不要进入长期记忆;第三,记忆要有来源、更新时间、过期策略和删除接口,方便处理用户撤回、合规删除和事实过期。

25.3 添加工具执行层

核心直觉

在最小实现里,工具调用只有一行 tool_fn(**block.input),没有参数验证、没有超时控制、没有重试。这在 Demo 里够用,生产里不够。

一个生产级的工具执行层需要做三件事:验证输入合法性、控制执行时间、处理失败并给模型有用的错误信息。

参数校验

用 Pydantic 定义工具的输入 schema,在执行前自动校验:

python
from pydantic import BaseModel, ValidationError
from typing import Any

class SearchInput(BaseModel):
    query: str

    def model_post_init(self, __context: Any) -> None:
        if len(self.query) < 2:
            raise ValueError("查询词太短,至少需要 2 个字符")
        if len(self.query) > 500:
            raise ValueError("查询词过长,最多 500 个字符")

class CalculateInput(BaseModel):
    expression: str

TOOL_SCHEMAS = {
    "search_web": SearchInput,
    "calculate": CalculateInput,
}

def execute_tool_safe(
    name: str,
    raw_input: dict,
    timeout_seconds: float = 10.0
) -> str:
    """
    安全执行工具:参数校验 + 超时 + 错误处理。
    返回字符串,方便直接放入 tool_result。
    """
    # 参数校验
    schema_cls = TOOL_SCHEMAS.get(name)
    if schema_cls is None:
        return f"错误:未知工具 '{name}'"

    try:
        validated = schema_cls(**raw_input)
    except ValidationError as e:
        # 把 Pydantic 错误转成 LLM 友好的提示
        errors = [f"{err['loc'][0]}: {err['msg']}" for err in e.errors()]
        return f"参数校验失败: {'; '.join(errors)}"

    # 执行工具(带超时)
    tool_fn = TOOL_REGISTRY.get(name)
    if tool_fn is None:
        return f"错误:工具 '{name}' 未注册实现"

    try:
        # 对外部 API 工具,优先在工具内部设置 HTTP timeout。
        # 这里用一个长期 executor 避免每次调用都创建线程;注意线程超时不能强杀正在运行的函数。
        import concurrent.futures
        future = TOOL_EXECUTOR.submit(tool_fn, **validated.model_dump())
        try:
            result = future.result(timeout=timeout_seconds)
            return str(result)
        except concurrent.futures.TimeoutError:
            future.cancel()
            return f"工具执行超时({timeout_seconds}s)。请缩小查询范围、稍后重试,或改用不依赖该工具的方案。"
    except Exception as e:
        return f"工具执行出错: {type(e).__name__}: {e}"
python
# 应用启动时创建,应用关闭时 shutdown。
# 如果工具可能执行 CPU 密集或不可中断代码,应放到独立进程、容器或 microVM,
# 不要依赖线程 timeout 提供安全隔离。
import concurrent.futures

TOOL_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=8)

错误信息要对 LLM 有用

工具执行失败时,你给模型返回什么很重要。

差的错误信息:"Error: connection refused"

好的错误信息:"数据库连接失败,可能是网络问题。建议:1) 使用缓存数据重试,或 2) 通知用户服务暂时不可用"

给模型的错误信息不是给开发者看的日志——它是模型下一步决策的依据。好的错误信息应该告诉模型发生了什么 + 可以怎么处理

25.4 添加 Guardrails

核心直觉

Guardrails 是运行在主 Agent 循环旁边的检查层,负责拦截危险输入、过滤有害输出、阻止越权的工具调用。第 15 章从设计模式角度讲了 Guardrails,这里关注实现细节。

三层 Guardrails 的执行位置:

三层实现

python
from dataclasses import dataclass
from enum import Enum

class GuardrailAction(Enum):
    PASS = "pass"
    BLOCK = "block"
    MODIFY = "modify"

@dataclass
class GuardrailResult:
    action: GuardrailAction
    content: str          # 通过则原样返回,阻断则是提示语,修改则是新内容
    reason: str = ""

# --- 输入护栏 ---
def input_guardrail(user_message: str) -> GuardrailResult:
    """检查用户输入,阻断明显的恶意或危险内容"""
    blocked_patterns = [
        "ignore previous instructions",
        "forget your system prompt",
        "你现在是",  # 角色扮演类注入的常见模式
    ]
    lower = user_message.lower()
    for pattern in blocked_patterns:
        if pattern in lower:
            return GuardrailResult(
                action=GuardrailAction.BLOCK,
                content="对不起,这条指令包含不允许的内容,无法处理。",
                reason=f"检测到可能的 Prompt 注入模式: '{pattern}'"
            )
    return GuardrailResult(action=GuardrailAction.PASS, content=user_message)

# --- 工具护栏 ---
DANGEROUS_TOOLS = {"delete_file", "drop_database", "send_email"}
REQUIRE_APPROVAL_TOOLS = {"write_file", "execute_code", "create_user"}

def tool_guardrail(tool_name: str, tool_input: dict) -> GuardrailResult:
    """在工具调用前校验:是否被允许、是否需要额外确认"""
    if tool_name in DANGEROUS_TOOLS:
        return GuardrailResult(
            action=GuardrailAction.BLOCK,
            content=f"工具 '{tool_name}' 已被策略禁止,无法调用。",
            reason="危险工具黑名单"
        )

    if tool_name in REQUIRE_APPROVAL_TOOLS:
        # 这里可以接入异步审批流程(见 16.4 节),简化版直接阻断
        return GuardrailResult(
            action=GuardrailAction.BLOCK,
            content=f"工具 '{tool_name}' 需要人工审批,当前自动模式下不可用。",
            reason="需要审批的工具"
        )

    return GuardrailResult(action=GuardrailAction.PASS, content="")

# --- 输出护栏 ---
def output_guardrail(response_text: str) -> GuardrailResult:
    """检查最终输出,过滤敏感信息泄露"""
    sensitive_patterns = ["sk-", "api_key=", "password=", "secret="]
    for pattern in sensitive_patterns:
        if pattern.lower() in response_text.lower():
            # 脱敏处理而不是直接阻断
            import re
            cleaned = re.sub(
                r'(sk-|api_key=|password=|secret=)[^\s\'"]+',
                r'\1[REDACTED]',
                response_text,
                flags=re.IGNORECASE
            )
            return GuardrailResult(
                action=GuardrailAction.MODIFY,
                content=cleaned,
                reason="检测到可能的敏感信息泄露,已脱敏"
            )

    return GuardrailResult(action=GuardrailAction.PASS, content=response_text)

工具护栏在真实系统里不应该只看工具名。更可靠的做法是把调用者、租户、资源、风险等级和当前任务目的一起交给策略引擎判断:

python
@dataclass
class ToolPolicyContext:
    user_id: str
    tenant_id: str
    scopes: set[str]
    environment: str  # dev / staging / prod

def tool_guardrail_with_context(
    tool_name: str,
    tool_input: dict,
    ctx: ToolPolicyContext,
) -> GuardrailResult:
    if tool_name == "query_database" and "db:read" not in ctx.scopes:
        return GuardrailResult(
            action=GuardrailAction.BLOCK,
            content="当前用户没有数据库读取权限。",
            reason="missing db:read scope",
        )

    if tool_name in {"send_email", "refund_order", "write_file"} and ctx.environment == "prod":
        return GuardrailResult(
            action=GuardrailAction.BLOCK,
            content=f"工具 '{tool_name}' 在生产环境需要人工审批。",
            reason="prod high-risk action",
        )

    return GuardrailResult(action=GuardrailAction.PASS, content="")

乐观执行模式

输出护栏可以与主循环并发运行,而不是串行等待:主 Agent 生成输出的同时,后台护栏已经在检查。如果护栏在主 Agent 完成前发现问题,抛出异常中断整个流程。这比"生成完再检查"省了一半延迟。

这个模式在 OpenAI Agents SDK 的文档里称为"Optimistic Execution" [2],实现上用 asyncio.gather 同时跑主流程和检查流程,任何一个提前完成或出错都触发 cancel 另一个。

25.5 添加可观测性

核心直觉

Agent 的 bug 通常不是单次 LLM 调用出了问题,而是某一步的输出影响了后续决策,导致几步之后偏离预期。没有完整的执行 Trace,你根本不知道问题出在哪一步。

用 OpenTelemetry(OTel)埋点,一次埋点,可以把 trace 导到任何后端(Langfuse、Arize Phoenix、Jaeger 等)。

OTel 埋点

python
# pip install opentelemetry-sdk opentelemetry-exporter-otlp
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.trace import Status, StatusCode
import time

# 初始化 OTel(在应用启动时调用一次)
def setup_tracing(service_name: str = "my-agent"):
    provider = TracerProvider()
    exporter = OTLPSpanExporter(endpoint="http://localhost:4317")  # Jaeger/Langfuse 地址
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)
    return trace.get_tracer(service_name)

tracer = setup_tracing()

def safe_preview(value: str, limit: int = 200) -> str:
    """写入 trace 前做最小脱敏,避免把 prompt、PII、secret 原样进日志。"""
    import re
    text = str(value)
    text = re.sub(r'(sk-|api_key=|password=|secret=)[^\s\'"]+', r'\1[REDACTED]', text, flags=re.IGNORECASE)
    return text[:limit]

def run_agent_with_tracing(user_message: str, session_id: str) -> str:
    """带 OTel trace 的 Agent 主循环"""
    with tracer.start_as_current_span("agent.run") as root_span:
        root_span.set_attribute("session.id", session_id)
        root_span.set_attribute("input.preview", safe_preview(user_message))

        messages = [{"role": "user", "content": user_message}]
        turn = 0

        while turn < 10:
            turn += 1

            # LLM 调用的 Span
            with tracer.start_as_current_span("llm.call") as llm_span:
                llm_span.set_attribute("llm.turn", turn)
                llm_span.set_attribute("llm.input_messages", len(messages))

                start = time.time()
                response = client.messages.create(
                    model="<your-claude-model>",
                    max_tokens=4096,
                    tools=TOOLS,
                    messages=messages,
                )
                elapsed = time.time() - start

                llm_span.set_attribute("llm.latency_ms", int(elapsed * 1000))
                llm_span.set_attribute("llm.stop_reason", response.stop_reason)
                llm_span.set_attribute("llm.input_tokens", response.usage.input_tokens)
                llm_span.set_attribute("llm.output_tokens", response.usage.output_tokens)

            messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason == "end_turn":
                final_text = next(
                    (b.text for b in response.content if b.type == "text"), ""
                )
                root_span.set_attribute("output.preview", safe_preview(final_text))
                return final_text

            if response.stop_reason != "tool_use":
                root_span.set_status(Status(StatusCode.ERROR, f"unexpected stop: {response.stop_reason}"))
                return f"[意外停止: {response.stop_reason}]"

            # 工具调用的 Span
            tool_results = []
            for block in response.content:
                if block.type != "tool_use":
                    continue

                with tracer.start_as_current_span("tool.call") as tool_span:
                    tool_span.set_attribute("tool.name", block.name)
                    tool_span.set_attribute("tool.input.preview", safe_preview(block.input, 500))

                    t_start = time.time()
                    result = execute_tool_safe(block.name, block.input)
                    t_elapsed = time.time() - t_start

                    tool_span.set_attribute("tool.latency_ms", int(t_elapsed * 1000))
                    tool_span.set_attribute("tool.result_preview", safe_preview(result))

                    if result.startswith("错误:") or result.startswith("工具执行"):
                        tool_span.set_status(Status(StatusCode.ERROR, result[:100]))

                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                    })

            messages.append({"role": "user", "content": tool_results})

        root_span.set_status(Status(StatusCode.ERROR, "max turns reached"))
        return "[达到最大轮次]"

每次 LLM 调用都记录了:轮次、输入/输出 Token 数、延迟、停止原因。每次工具调用都记录了:工具名、输入、延迟、结果预览。你拿到 trace 之后,能清楚地看到"第 3 轮 LLM 调用了 search_web,工具耗时 2.3 秒,结果被用于第 4 轮推理"这样的完整链路。

25.6 添加持久化

核心直觉

没有持久化的 Agent 就像没有存档的游戏——进程一死,进度全丢。对于运行几十分钟的任务,这是不可接受的。

持久化的核心操作是 Checkpoint(检查点):在每一轮工具调用之后,把当前的消息历史和状态序列化写入存储。重启时,从最后一个检查点恢复,而不是从头开始。

SQLite 检查点实现

python
import sqlite3
import json
from datetime import datetime

class CheckpointStore:
    def __init__(self, db_path: str = "checkpoints.db"):
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self._init_schema()

    def _init_schema(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS checkpoints (
                run_id TEXT NOT NULL,
                turn INTEGER NOT NULL,
                messages TEXT NOT NULL,
                metadata TEXT,
                created_at TEXT NOT NULL,
                PRIMARY KEY (run_id, turn)
            )
        """)
        self.conn.commit()

    def save(self, run_id: str, turn: int, messages: list, metadata: dict = None):
        """保存检查点"""
        self.conn.execute(
            """INSERT OR REPLACE INTO checkpoints
               (run_id, turn, messages, metadata, created_at) VALUES (?, ?, ?, ?, ?)""",
            (
                run_id,
                turn,
                json.dumps(serialize_messages(messages), ensure_ascii=False),
                json.dumps(metadata or {}),
                datetime.utcnow().isoformat()
            )
        )
        self.conn.commit()

    def load_latest(self, run_id: str) -> tuple[list, int]:
        """加载最新检查点,返回 (messages, turn)"""
        cursor = self.conn.execute(
            "SELECT messages, turn FROM checkpoints WHERE run_id = ? ORDER BY turn DESC LIMIT 1",
            (run_id,)
        )
        row = cursor.fetchone()
        if row is None:
            return [], 0
        return json.loads(row[0]), row[1]

    def delete(self, run_id: str):
        """任务完成后清理检查点"""
        self.conn.execute("DELETE FROM checkpoints WHERE run_id = ?", (run_id,))
        self.conn.commit()

checkpoint_store = CheckpointStore()

def serialize_content(content) -> list | str:
    """把 Anthropic SDK 的 content 列表转成可序列化的字典"""
    if isinstance(content, str):
        return content

    result = []
    for block in content:
        if isinstance(block, dict):
            result.append(block)
        elif block.type == "text":
            result.append({"type": "text", "text": block.text})
        elif block.type == "tool_use":
            result.append({
                "type": "tool_use",
                "id": block.id,
                "name": block.name,
                "input": block.input
            })
    return result

def serialize_messages(messages: list) -> list:
    return [
        {"role": msg["role"], "content": serialize_content(msg["content"])}
        for msg in messages
    ]

def run_agent_with_checkpoint(
    user_message: str,
    run_id: str,  # 每次任务的唯一 ID,用于关联检查点
    resume: bool = False
) -> str:
    """支持断点续传的 Agent"""

    if resume:
        messages, start_turn = checkpoint_store.load_latest(run_id)
        if not messages:
            # 没找到检查点,从头开始
            messages = [{"role": "user", "content": user_message}]
            start_turn = 0
        print(f"[恢复] 从第 {start_turn} 轮继续")
    else:
        messages = [{"role": "user", "content": user_message}]
        start_turn = 0

    for turn in range(start_turn, start_turn + 20):
        response = client.messages.create(
            model="<your-claude-model>",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason == "end_turn":
            checkpoint_store.delete(run_id)  # 完成后清理
            return next(
                (b.text for b in response.content if b.type == "text"), ""
            )

        if response.stop_reason != "tool_use":
            return f"[停止: {response.stop_reason}]"

        tool_results = []
        for block in response.content:
            if block.type != "tool_use":
                continue
            result = execute_tool_safe(block.name, block.input)
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": result,
            })
        messages.append({"role": "user", "content": tool_results})

        # 保存到一个可恢复边界:assistant 的 tool_use 后面已经跟着对应 tool_result。
        # 如果在 tool_use 之后、tool_result 之前保存,恢复时很容易形成无效消息历史。
        checkpoint_store.save(
            run_id=run_id,
            turn=turn + 1,
            messages=messages,
            metadata={"stop_reason": response.stop_reason}
        )

    return "[达到最大轮次]"

检查点的序列化注意事项

response.content 是 Anthropic SDK 返回的对象列表,直接 json.dumps 会失败。不要用 default=str 糊过去,否则恢复后消息结构会变成普通字符串,下一轮模型调用可能失效。生产里要像上面一样把每个 ContentBlock 手动转成字典:

python
def serialize_content(content) -> list:
    """把 Anthropic SDK 的 content 列表转成可序列化的字典"""
    result = []
    for block in content:
        if block.type == "text":
            result.append({"type": "text", "text": block.text})
        elif block.type == "tool_use":
            result.append({
                "type": "tool_use",
                "id": block.id,
                "name": block.name,
                "input": block.input
            })
    return result

还有一个更重要的生产细节:checkpoint 只能解决状态恢复,不能自动解决副作用重放。如果工具已经发了邮件、扣了款、写了数据库,但进程在保存 checkpoint 前崩溃,恢复后可能重复执行同一个工具。因此所有有副作用的工具都要带 idempotency key,并在工具侧做去重。

SQLite 适合教学和单机原型。多 Worker 生产部署通常要换成 Postgres / Redis / durable queue,并加上 run-level lock、状态机版本号、checkpoint schema version 和过期清理策略,避免两个 Worker 同时恢复同一个 run。

25.7 添加流式交互与后台任务状态

核心直觉

Agent 处理一个任务可能需要 30 秒甚至更长。用户盯着空白屏幕等 30 秒是很差的体验。流式输出和任务状态推送解决这个问题:让用户实时看到 Agent 在做什么,而不是等到全部完成才能看到结果。

三种推送机制各有适用场景:

机制适用场景关键特点
SSE(Server-Sent Events)实时 token 流、工具状态更新单向推送,HTTP 复用,断线自动重连
WebSocket需要双向交互(用户实时纠偏)双向通信,适合交互式 Agent
Webhook长任务完成通知、异步任务结果无需保持连接,适合数分钟以上的任务

SSE 流式实现

用 FastAPI 实现 SSE 端点,实时推送 Agent 执行状态:

python
# pip install fastapi uvicorn anthropic
import asyncio
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncIterator
from anthropic import AsyncAnthropic

app = FastAPI()
async_client = AsyncAnthropic()

async def agent_event_stream(user_message: str) -> AsyncIterator[str]:
    """
    异步生成器,产出 SSE 格式的事件流。
    每个事件格式:data: <json>\n\n
    """
    messages = [{"role": "user", "content": user_message}]

    def sse_event(event_type: str, data: dict) -> str:
        payload = json.dumps({"type": event_type, **data}, ensure_ascii=False)
        return f"data: {payload}\n\n"

    yield sse_event("start", {"message": "开始处理任务"})

    for turn in range(10):
        yield sse_event("thinking", {"turn": turn + 1})

        # 使用 Anthropic 的流式 API
        tool_results = []

        async with async_client.messages.stream(
            model="<your-claude-model>",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        ) as stream:
            current_text = ""

            async for event in stream:
                # 推送增量 token
                if event.type == "content_block_delta":
                    if hasattr(event.delta, "text"):
                        current_text += event.delta.text
                        yield sse_event("token", {"delta": event.delta.text})

            # 流结束,获取完整响应
            response = await stream.get_final_message()

        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason == "end_turn":
            yield sse_event("complete", {
                "answer": next(
                    (b.text for b in response.content if b.type == "text"), ""
                )
            })
            return

        if response.stop_reason != "tool_use":
            yield sse_event("error", {"message": f"意外停止: {response.stop_reason}"})
            return

        # 工具调用,推送工具状态
        for block in response.content:
            if block.type != "tool_use":
                continue

            yield sse_event("tool_start", {
                "tool": block.name,
                "input_preview": str(block.input)[:100]
            })

            result = await asyncio.get_event_loop().run_in_executor(
                None, execute_tool_safe, block.name, block.input
            )

            yield sse_event("tool_done", {
                "tool": block.name,
                "result_preview": result[:200]
            })

            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": result,
            })

        messages.append({"role": "user", "content": tool_results})

    yield sse_event("error", {"message": "达到最大轮次"})


@app.get("/agent/stream")
async def stream_agent(message: str):
    return StreamingResponse(
        agent_event_stream(message),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # 关闭 Nginx 缓冲
        }
    )

前端接收 SSE 事件的代码(JavaScript):

javascript
const source = new EventSource('/agent/stream?message=帮我搜索最新的Python版本');

source.onmessage = (event) => {
  const data = JSON.parse(event.data);
  switch (data.type) {
    case 'token':
      appendText(data.delta);  // 实时渲染 token
      break;
    case 'tool_start':
      showToolStatus(`正在调用 ${data.tool}...`);
      break;
    case 'tool_done':
      updateToolStatus(data.tool, '完成');
      break;
    case 'complete':
      source.close();
      showFinalAnswer(data.answer);
      break;
    case 'error':
      source.close();
      showError(data.message);
      break;
  }
};

25.8 Coding Agent 架构模式

前面几节建立了通用 Agent 的完整骨架。Coding Agent 是 Agent 最复杂的应用场景之一,它在通用骨架之上有几个特有的架构模式。

repo-level context management

Coding Agent 操作的是整个代码库,而不是单个文件。但不能把整个 repo 塞进上下文——一个中型项目有几万甚至几十万行代码。

有效的 repo-level context 管理分三个层次:

python
import os
import ast
from pathlib import Path

def get_repo_skeleton(repo_path: str, max_depth: int = 3) -> str:
    """生成 repo 目录树(粗粒度,快速给 LLM 方向感)"""
    lines = []
    for root, dirs, files in os.walk(repo_path):
        # 过滤不相关目录
        dirs[:] = [d for d in dirs if d not in {'.git', '__pycache__', 'node_modules', '.venv'}]

        depth = root.replace(repo_path, '').count(os.sep)
        if depth > max_depth:
            continue
        indent = '  ' * depth
        lines.append(f"{indent}{os.path.basename(root)}/")
        for f in files:
            lines.append(f"{indent}  {f}")
    return '\n'.join(lines)

def extract_python_signatures(file_path: str) -> str:
    """提取 Python 文件的类和函数签名(不包含函数体)"""
    with open(file_path, 'r', encoding='utf-8') as f:
        source = f.read()

    try:
        tree = ast.parse(source)
    except SyntaxError:
        return f"# 无法解析: {file_path}"

    signatures = []
    for node in ast.walk(tree):
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            args = [arg.arg for arg in node.args.args]
            sig = f"def {node.name}({', '.join(args)}): ..."
            signatures.append(sig)
        elif isinstance(node, ast.ClassDef):
            signatures.append(f"class {node.name}: ...")
    return '\n'.join(signatures)

def build_coding_context(task: str, repo_path: str, relevant_files: list[str]) -> str:
    """为编程任务构建最优上下文"""
    context_parts = []

    # 1. 任务描述
    context_parts.append(f"## 任务\n{task}")

    # 2. repo 结构(方向感)
    context_parts.append(f"## 项目结构\n```\n{get_repo_skeleton(repo_path)}\n```")

    # 3. 相关文件的完整内容
    for file_path in relevant_files:
        full_path = os.path.join(repo_path, file_path)
        if os.path.exists(full_path):
            with open(full_path) as f:
                content = f.read()
            context_parts.append(f"## {file_path}\n```python\n{content}\n```")

    return '\n\n'.join(context_parts)

edit-apply loop

Coding Agent 生成代码修改时,有两种模式:

  1. 生成完整文件:简单,但对大文件浪费 Token,且容易引入不相关改动
  2. 生成 Diff / Patch:精确,但需要正确解析和应用

生产级 Coding Agent 通常用一种"伪 diff"格式,让 LLM 生成 search/replace 块:

python
import re

def apply_edit(original: str, search: str, replace: str) -> tuple[str, bool]:
    """
    应用一次 search/replace 编辑。
    返回 (new_content, success)。
    """
    if search not in original:
        # LLM 给的 search 块不匹配,失败
        return original, False

    # 确认只有一处匹配(多处匹配时让 LLM 提供更多上下文)
    if original.count(search) > 1:
        return original, False

    return original.replace(search, replace, 1), True

def parse_edits_from_response(response_text: str) -> list[dict]:
    """
    从 LLM 响应中解析编辑块。
    期望格式:
    ```
    <<<<<<< SEARCH
    原始代码
    =======
    新代码
    >>>>>>> REPLACE
    ```
    """
    pattern = r'<<<<<<< SEARCH\n(.*?)\n=======\n(.*?)\n>>>>>>> REPLACE'
    matches = re.findall(pattern, response_text, re.DOTALL)
    return [{"search": m[0], "replace": m[1]} for m in matches]

def apply_all_edits(file_path: str, edits: list[dict]) -> tuple[bool, str]:
    """
    对文件应用一组编辑,全部成功才写回。
    失败时返回失败原因,文件不改动。
    """
    with open(file_path) as f:
        content = f.read()

    for i, edit in enumerate(edits):
        content, success = apply_edit(content, edit["search"], edit["replace"])
        if not success:
            return False, f"第 {i+1} 处编辑匹配失败:\n```\n{edit['search'][:100]}\n```"

    with open(file_path, 'w') as f:
        f.write(content)

    return True, "所有编辑已应用"

test-fix cycle

一个完整的 test-fix cycle 是这样的:Agent 修改代码 → 运行测试 → 如果测试失败,把失败信息喂给 LLM → LLM 生成修复 → 再运行测试 → 直到通过或放弃。

关键工程细节是防止无限循环

python
import subprocess
import shlex

def run_tests(test_command: str, timeout: int = 60) -> tuple[bool, str]:
    """运行测试,返回 (passed, output)"""
    try:
        result = subprocess.run(
            shlex.split(test_command),
            capture_output=True,
            text=True,
            timeout=timeout
        )
        passed = result.returncode == 0
        output = result.stdout + result.stderr
        return passed, output[:3000]  # 截断避免过长
    except subprocess.TimeoutExpired:
        return False, f"测试超时({timeout}s)"
    except Exception as e:
        return False, f"测试运行失败: {e}"

def test_fix_cycle(
    file_path: str,
    test_command: str,
    max_attempts: int = 3
) -> tuple[bool, str]:
    """
    测试-修复循环。
    max_attempts: 最多尝试 N 次修复,超出则放弃(防无限循环)
    """
    for attempt in range(max_attempts):
        passed, output = run_tests(test_command)
        if passed:
            return True, f"测试在第 {attempt + 1} 次尝试后通过"

        if attempt == max_attempts - 1:
            return False, f"达到最大修复次数({max_attempts}),测试仍然失败"

        # 喂给 Agent 修复
        fix_prompt = f"""
测试失败了,以下是测试输出:

请分析失败原因并修复 {file_path} 中的代码。
只修改导致测试失败的代码,不要引入其他改动。
"""
        # 这里调用主 Agent 进行修复(略去完整调用,概念同 run_agent)
        fix_response = run_agent(fix_prompt, max_turns=5)

        # 解析并应用编辑
        edits = parse_edits_from_response(fix_response)
        if edits:
            success, msg = apply_all_edits(file_path, edits)
            if not success:
                return False, f"编辑应用失败: {msg}"

    return False, "修复失败"

安全边界

Coding Agent 有几个必须遵守的安全规则,违反任何一条都可能造成数据损失:

  1. 只操作任务相关的文件:Agent 不应该修改用户没有提到的文件,更不应该修改系统文件
  2. 保护未提交的改动:在 Agent 开始工作之前,检查 git status,不覆盖用户正在编写的变更
  3. 命令执行必须沙箱化subprocess.run 不要直接执行 LLM 生成的 shell 命令,需要白名单验证
python
ALLOWED_PREFIXES = [
    ("python", "-m", "pytest"),
    ("pytest",),
    ("npm", "test"),
    ("git", "status"),
    ("git", "diff"),
]

def safe_execute_command(command: list[str]) -> tuple[bool, str]:
    """只允许白名单命令执行"""
    if not command:
        return False, "空命令"

    if not any(tuple(command[:len(prefix)]) == prefix for prefix in ALLOWED_PREFIXES):
        return False, f"命令不在允许列表中: {' '.join(command[:3])}"

    # 额外检查:不允许 shell 注入字符
    full_cmd = " ".join(command)
    for dangerous in [";", "&&", "||", "|", ">", "<", "`", "$(", "${", "\n"]:
        if dangerous in full_cmd:
            return False, f"命令包含不允许的字符: '{dangerous}'"

    try:
        result = subprocess.run(
            command,
            capture_output=True,
            text=True,
            timeout=30,
            # 不继承 shell 环境变量里的敏感凭据
            env={"PATH": "/usr/local/bin:/usr/bin:/bin"}
        )
        return result.returncode == 0, result.stdout + result.stderr
    except Exception as e:
        return False, str(e)

把所有能力组合在一起

前面八个小节各自独立,但在实际系统里它们需要协同工作。下面是一个把核心能力组合在一起的 Agent 入口:

python
import uuid

class ProductionAgent:
    """
    把记忆、工具层、Guardrails、可观测性、持久化整合在一起的 Agent。
    流式交互通过 FastAPI 端点暴露(见 25.7 节)。
    """

    def __init__(self):
        self.memory = AgentMemory()
        self.checkpoints = CheckpointStore()

    def run(
        self,
        user_message: str,
        session_id: str,
        run_id: str = None,
        resume: bool = False
    ) -> str:
        run_id = run_id or str(uuid.uuid4())

        # 1. 输入护栏
        input_check = input_guardrail(user_message)
        if input_check.action == GuardrailAction.BLOCK:
            return input_check.content

        # 2. 加载长期记忆
        memories = self.memory.load_recent(session_id)
        memory_context = "\n".join(memories) if memories else ""
        system_context = "你是一个有用的助手。" + (
            f"\n\n用户历史偏好:\n{memory_context}" if memory_context else ""
        )

        # 3. 断点续传
        if resume:
            messages, start_turn = self.checkpoints.load_latest(run_id)
        else:
            messages = []

        if not messages:
            messages = [{"role": "user", "content": user_message}]
            start_turn = 0

        # 4. OTel trace 包裹的主循环
        with tracer.start_as_current_span("agent.production_run") as span:
            span.set_attribute("session.id", session_id)
            span.set_attribute("run.id", run_id)

            for turn in range(start_turn, start_turn + 20):
                # 上下文压缩
                if count_tokens(messages) > 150_000:
                    messages = compact_messages(messages)

                response = client.messages.create(
                    model="<your-claude-model>",
                    max_tokens=4096,
                    system=system_context,
                    tools=TOOLS,
                    messages=messages,
                )
                messages.append({"role": "assistant", "content": response.content})

                # 保存检查点
                self.checkpoints.save(run_id, turn + 1, messages)

                if response.stop_reason == "end_turn":
                    final_text = next(
                        (b.text for b in response.content if b.type == "text"), ""
                    )

                    # 5. 输出护栏
                    output_check = output_guardrail(final_text)
                    result = output_check.content if output_check.action in (
                        GuardrailAction.PASS, GuardrailAction.MODIFY
                    ) else "内容无法展示"

                    self.checkpoints.delete(run_id)
                    return result

                if response.stop_reason != "tool_use":
                    return f"[停止: {response.stop_reason}]"

                tool_results = []
                for block in response.content:
                    if block.type != "tool_use":
                        continue

                    # 工具护栏
                    tool_check = tool_guardrail(block.name, block.input)
                    if tool_check.action == GuardrailAction.BLOCK:
                        tool_results.append({
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": tool_check.content,
                        })
                        continue

                    result = execute_tool_safe(block.name, block.input)
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                    })

                messages.append({"role": "user", "content": tool_results})

        return "[达到最大轮次]"

常见误区

误区一:把所有能力都加进第一个版本

从 MVP 开始:先跑通主循环,再加工具层,再加 Guardrails,再加持久化。如果你第一天就把所有能力堆在一起,出了问题你不知道哪一层引入的。

误区二:以为框架屏蔽了这些复杂度

LangGraph 的 MemorySaver 内部就是 Checkpoint 序列化;CrewAI 的 Process.sequential 内部就是消息历史管理;OpenAI SDK 的 guardrail 就是输入/输出检查。你用框架,这些复杂度还在,只是被隐藏了。理解了原理,你才能在框架行为异常时知道去哪里找问题。

误区三:测试只测最终输出

Agent 的 bug 通常藏在中间步骤——第三轮工具调用的参数错了,但最终输出看起来还算正常。测试要覆盖 Transcript(完整执行轨迹),而不只是最终答案。第 18 章的 Eval 体系里讲了如何系统地做这件事。

误区四:eval 可以直接用于工具调用

上面 calculate 工具的示例用了 eval,并且加了 {"__builtins__": {}} 的限制。但这个限制是不完整的——绕过它的方式已经被广泛记录。生产里的代码执行必须用独立沙箱(microVM 或容器),不能在主进程里用 eval


面试高频题

Q:如果不用任何框架,你如何从零实现一个支持工具调用和记忆的 Agent?

参考答案框架:

  1. 核心循环messages 列表 + while True + stop_reason 判断
  2. 工具调用:解析 tool_use block → 查工具注册表 → 执行 → 追加 tool_result
  3. 短期记忆messages 列表本身;超出上限时用 LLM 摘要压缩
  4. 长期记忆:关键信息写入 SQLite,每次任务开始时加载相关条目注入 System Prompt
  5. 持久化:每轮工具调用后序列化 messages 到存储,重启时恢复

加分点:

  • 提到工具层的参数校验(Pydantic)和超时控制(ThreadPoolExecutor
  • 提到 OTel 埋点——"我需要知道第几轮出了问题"
  • 提到检查点的序列化细节(SDK 对象不能直接 JSON 序列化,需要手动转换)
  • 提到 eval 沙箱风险,Coding Agent 的 execute_code 必须用独立执行环境

Q:Coding Agent 的 test-fix cycle 中,如何防止陷入无限修复循环?

参考答案框架:

  1. 硬性上限max_attempts 参数,超出后停止并上报,不再尝试
  2. 失败模式检测:连续两次生成了相同的修复内容,说明 LLM 卡住了,直接停止
  3. 收敛检测:如果测试失败数量没有减少(或增加了),停止修复
  4. 回退点:每次修复前保存 diff,失败超出阈值后回滚到任务开始时的状态

加分点:提到给 LLM 的错误信息格式很重要——原始测试输出太长会稀释有用信息,应该提取关键失败行而不是全量输出。


参考资料

[1] Building Effective Agents - Anthropic (https://www.anthropic.com/engineering/building-effective-agents)

[2] Agents SDK Guardrails - OpenAI (https://openai.github.io/openai-agents-python/guardrails/)

[3] Effective context engineering - Anthropic (https://www.anthropic.com/engineering/effective-context-engineering-for-ai-agents)

[4] AI agent observability - OpenTelemetry (https://opentelemetry.io/blog/2025/ai-agent-observability/)

[5] A practical guide to building agents - OpenAI (https://cdn.openai.com/business-guides-and-resources/a-practical-guide-to-building-agents.pdf)

[6] Anthropic Messages API 文档 (https://docs.anthropic.com/en/api/messages)

[7] Anthropic Token Counting API 文档 (https://docs.anthropic.com/en/api/messages-count-tokens)

[8] Anthropic Streaming Messages 文档 (https://docs.anthropic.com/claude/reference/messages-streaming)