第25章:从零构建一个生产级 Agent(不依赖框架)
你已经读完了前 24 章,手边有 LangGraph、CrewAI、OpenAI SDK、smolagents 可以选。那么为什么还要从零写一个 Agent?
因为框架封装了复杂性,也隐藏了复杂性。当框架表现异常时——工具没有被调用、记忆没有被更新、某个步骤莫名循环——你需要知道框架底层在做什么,才能诊断问题。更关键的是,很多生产级 Agent 需要定制检查点粒度、自定义流式协议、嵌入已有的服务架构,这些需求用框架实现往往比自己写更麻烦。
Anthropic 的官方文档有一句话值得引用 [1]:
For many LLM interactions, the overhead of a framework isn't worth it. Start with direct API calls, add complexity only when needed.
这不是反对使用框架,而是说直接 API 调用是理解底层机制的最短路径。你搞清楚了没有框架时要做什么,框架才能真正成为你的工具,而不是你依赖的黑盒。
这一章从最小 ReAct 循环开始,逐步叠加记忆、工具层、Guardrails、可观测性、持久化、流式交互,最后专门讲 Coding Agent 的几个核心架构模式。代码以可落地的教学片段为主,真实生产还需要接入你的模型、工具、权限、审计和部署环境。读完这章,你应该能从一个空文件开始,理解一个生产级 Agent 的主要部件如何拼起来。
25.1 核心循环
核心直觉
所有 Agent,不论多复杂,底层都是同一个 while 循环:接收指令 → LLM 推理 → 解析动作 → 执行工具 → 把结果还给 LLM → 判断是否停止。框架做的事情,是把这个循环包装成更高级的抽象——图节点、任务队列、Role 定义。
把这个循环弄清楚,你就看穿了所有框架。
循环有两个重要边界:
- 入口:第一条用户消息进入消息历史
- 出口:
stop_reason变成end_turn(任务完成),或者达到最大轮次/Token 上限(强制停止)
循环里的每一次 LLM 调用,都会把完整的消息历史发给模型。这就是为什么 Agent 的成本会随着对话轮数线性增长——每一轮的输入 Token 数都比上一轮多。
最小可运行实现
下面是一个 80 行的 Agent,没有任何依赖,只用 anthropic SDK:
import json
import anthropic
client = anthropic.Anthropic()
# 工具定义(JSON Schema)
TOOLS = [
{
"name": "search_web",
"description": "搜索互联网获取最新信息。适合查询实时数据、新闻、产品信息。",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,尽量具体"
}
},
"required": ["query"]
}
},
{
"name": "calculate",
"description": "执行数学计算。输入 Python 表达式,返回计算结果。",
"input_schema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "合法的 Python 数学表达式,如 '2 ** 10' 或 '(100 - 30) / 100'"
}
},
"required": ["expression"]
}
}
]
# 工具实现
def search_web(query: str) -> str:
# 生产中这里接入 Tavily / Bing / Brave Search API
return f"[模拟搜索结果] 关于'{query}'的搜索结果:找到 3 条相关内容。"
import ast
import operator
ALLOWED_MATH_OPERATORS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.USub: operator.neg,
}
def _eval_math_node(node):
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
return node.value
if isinstance(node, ast.BinOp) and type(node.op) in ALLOWED_MATH_OPERATORS:
return ALLOWED_MATH_OPERATORS[type(node.op)](
_eval_math_node(node.left),
_eval_math_node(node.right),
)
if isinstance(node, ast.UnaryOp) and type(node.op) in ALLOWED_MATH_OPERATORS:
return ALLOWED_MATH_OPERATORS[type(node.op)](_eval_math_node(node.operand))
raise ValueError("只允许数字和基础数学运算")
def calculate(expression: str) -> str:
try:
parsed = ast.parse(expression, mode="eval")
result = _eval_math_node(parsed.body)
return str(result)
except Exception as e:
return f"计算错误: {e}"
TOOL_REGISTRY = {
"search_web": search_web,
"calculate": calculate,
}
def run_agent(user_message: str, max_turns: int = 10) -> str:
"""最小 ReAct Agent 主循环"""
messages = [{"role": "user", "content": user_message}]
for turn in range(max_turns):
response = client.messages.create(
model="<your-claude-model>",
max_tokens=4096,
tools=TOOLS,
messages=messages,
)
# 把 assistant 响应追加到历史
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "end_turn":
# 提取最终文本
return next(
(block.text for block in response.content if block.type == "text"),
""
)
if response.stop_reason != "tool_use":
return f"[意外停止: {response.stop_reason}]"
# 处理工具调用
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
tool_fn = TOOL_REGISTRY.get(block.name)
if tool_fn is None:
result = f"错误:未知工具 '{block.name}'"
else:
try:
result = tool_fn(**block.input)
except Exception as e:
result = f"工具执行失败: {e}"
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({"role": "user", "content": tool_results})
return "[达到最大轮次,任务未完成]"
if __name__ == "__main__":
answer = run_agent("1024 的平方根是多少?再搜索一下最新的 Python 版本")
print(answer)这 80 行代码包含了一个 Agent 的完整骨架。接下来每一节都是在这个骨架上叠加能力。
25.2 添加记忆
核心直觉
messages 列表本身就是短期记忆——它记录了当前会话里发生的一切。问题是这个列表会无限增长,而 LLM 的上下文窗口是有限的。长任务运行一段时间后,输入 Token 数会逼近上限,成本也会线性飙升。
记忆系统要解决两个问题:在窗口内放什么,以及超出窗口时怎么处理。
上下文窗口管理
最简单的截断策略是保留最近 N 条消息。但这会丢失任务上下文——Agent 在第 3 轮拿到的关键信息,到第 20 轮已经被截掉了。
更好的做法是摘要压缩(Compaction):当消息列表的 Token 数接近上限时,把历史消息压缩成一段摘要,保留关键决策和发现,丢弃冗余的工具调用详情。
def count_tokens(messages: list, tools: list | None = None) -> int:
"""
估算消息列表的 token 数。
生产环境优先使用模型厂商的 count_tokens / usage API;
这里用字符数近似,适合触发 compaction 的粗略阈值,不适合计费。
"""
text = []
for msg in messages:
content = msg["content"]
if isinstance(content, str):
text.append(content)
elif isinstance(content, list):
for block in content:
if isinstance(block, dict):
text.append(str(block.get("text") or block.get("content") or block))
else:
text.append(str(block))
if tools:
text.append(str(tools))
return sum(len(part) for part in text) // 4
def compact_messages(messages: list, keep_last: int = 4) -> list:
"""
当消息列表过长时,压缩历史部分。
保留最开始的系统消息 + 最近 keep_last 条消息。
中间部分用 LLM 摘要替换。
"""
if len(messages) <= keep_last + 1:
return messages
# 分离出需要压缩的历史部分
history_to_compress = messages[:-keep_last]
recent_messages = messages[-keep_last:]
# 用 LLM 生成历史摘要
summary_prompt = "请总结以下对话历史中的关键信息、重要发现和已完成的步骤(100字以内):\n\n"
for msg in history_to_compress:
if isinstance(msg["content"], str):
summary_prompt += f"{msg['role']}: {msg['content']}\n"
summary_response = client.messages.create(
model="<your-small-claude-model>", # 用小模型做摘要,省钱
max_tokens=200,
messages=[{"role": "user", "content": summary_prompt}]
)
summary = summary_response.content[0].text
# 重建消息列表:摘要 + 最近消息
compressed = [
{
"role": "user",
"content": f"[对话历史摘要] {summary}"
},
{
"role": "assistant",
"content": "好的,我了解了之前的进展。"
}
] + recent_messages
return compressed
# 在主循环里集成:
def run_agent_with_memory(user_message: str, max_turns: int = 20) -> str:
messages = [{"role": "user", "content": user_message}]
TOKEN_LIMIT = 150_000 # 留出 50k 给输出
for turn in range(max_turns):
# 检查是否需要压缩
if count_tokens(messages) > TOKEN_LIMIT:
messages = compact_messages(messages)
response = client.messages.create(
model="<your-claude-model>",
max_tokens=4096,
tools=TOOLS,
messages=messages,
)
# ... 余下和前面相同跨会话长期记忆
短期记忆只在当前会话里有效。跨会话的记忆需要外部存储。最简单的实现是 SQLite:
import sqlite3
import json
from datetime import datetime
class AgentMemory:
def __init__(self, db_path: str = "agent_memory.db"):
self.conn = sqlite3.connect(db_path)
self._init_schema()
def _init_schema(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
content TEXT NOT NULL,
importance INTEGER DEFAULT 5,
created_at TEXT NOT NULL
)
""")
self.conn.commit()
def save(self, session_id: str, content: str, importance: int = 5):
self.conn.execute(
"INSERT INTO memories (session_id, content, importance, created_at) VALUES (?, ?, ?, ?)",
(session_id, content, importance, datetime.utcnow().isoformat())
)
self.conn.commit()
def load_recent(self, session_id: str, limit: int = 10) -> list[str]:
cursor = self.conn.execute(
"SELECT content FROM memories WHERE session_id = ? ORDER BY importance DESC, created_at DESC LIMIT ?",
(session_id, limit)
)
return [row[0] for row in cursor.fetchall()]长期记忆的关键问题是什么该记。不是所有工具调用结果都值得存——一次失败的搜索不需要记,但用户确认过的一个关键事实("用户偏好使用 PostgreSQL")值得记。简单的规则:只记录跨任务有价值的内容,工具调用的中间结果不要存。
生产里的长期记忆还要加三条边界:第一,key 里要包含 tenant/user scope,避免不同租户或角色之间串记忆;第二,PII、secret、访问 token、完整工具参数不要进入长期记忆;第三,记忆要有来源、更新时间、过期策略和删除接口,方便处理用户撤回、合规删除和事实过期。
25.3 添加工具执行层
核心直觉
在最小实现里,工具调用只有一行 tool_fn(**block.input),没有参数验证、没有超时控制、没有重试。这在 Demo 里够用,生产里不够。
一个生产级的工具执行层需要做三件事:验证输入合法性、控制执行时间、处理失败并给模型有用的错误信息。
参数校验
用 Pydantic 定义工具的输入 schema,在执行前自动校验:
from pydantic import BaseModel, ValidationError
from typing import Any
class SearchInput(BaseModel):
query: str
def model_post_init(self, __context: Any) -> None:
if len(self.query) < 2:
raise ValueError("查询词太短,至少需要 2 个字符")
if len(self.query) > 500:
raise ValueError("查询词过长,最多 500 个字符")
class CalculateInput(BaseModel):
expression: str
TOOL_SCHEMAS = {
"search_web": SearchInput,
"calculate": CalculateInput,
}
def execute_tool_safe(
name: str,
raw_input: dict,
timeout_seconds: float = 10.0
) -> str:
"""
安全执行工具:参数校验 + 超时 + 错误处理。
返回字符串,方便直接放入 tool_result。
"""
# 参数校验
schema_cls = TOOL_SCHEMAS.get(name)
if schema_cls is None:
return f"错误:未知工具 '{name}'"
try:
validated = schema_cls(**raw_input)
except ValidationError as e:
# 把 Pydantic 错误转成 LLM 友好的提示
errors = [f"{err['loc'][0]}: {err['msg']}" for err in e.errors()]
return f"参数校验失败: {'; '.join(errors)}"
# 执行工具(带超时)
tool_fn = TOOL_REGISTRY.get(name)
if tool_fn is None:
return f"错误:工具 '{name}' 未注册实现"
try:
# 对外部 API 工具,优先在工具内部设置 HTTP timeout。
# 这里用一个长期 executor 避免每次调用都创建线程;注意线程超时不能强杀正在运行的函数。
import concurrent.futures
future = TOOL_EXECUTOR.submit(tool_fn, **validated.model_dump())
try:
result = future.result(timeout=timeout_seconds)
return str(result)
except concurrent.futures.TimeoutError:
future.cancel()
return f"工具执行超时({timeout_seconds}s)。请缩小查询范围、稍后重试,或改用不依赖该工具的方案。"
except Exception as e:
return f"工具执行出错: {type(e).__name__}: {e}"# 应用启动时创建,应用关闭时 shutdown。
# 如果工具可能执行 CPU 密集或不可中断代码,应放到独立进程、容器或 microVM,
# 不要依赖线程 timeout 提供安全隔离。
import concurrent.futures
TOOL_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=8)错误信息要对 LLM 有用
工具执行失败时,你给模型返回什么很重要。
差的错误信息:"Error: connection refused"
好的错误信息:"数据库连接失败,可能是网络问题。建议:1) 使用缓存数据重试,或 2) 通知用户服务暂时不可用"
给模型的错误信息不是给开发者看的日志——它是模型下一步决策的依据。好的错误信息应该告诉模型发生了什么 + 可以怎么处理。
25.4 添加 Guardrails
核心直觉
Guardrails 是运行在主 Agent 循环旁边的检查层,负责拦截危险输入、过滤有害输出、阻止越权的工具调用。第 15 章从设计模式角度讲了 Guardrails,这里关注实现细节。
三层 Guardrails 的执行位置:
三层实现
from dataclasses import dataclass
from enum import Enum
class GuardrailAction(Enum):
PASS = "pass"
BLOCK = "block"
MODIFY = "modify"
@dataclass
class GuardrailResult:
action: GuardrailAction
content: str # 通过则原样返回,阻断则是提示语,修改则是新内容
reason: str = ""
# --- 输入护栏 ---
def input_guardrail(user_message: str) -> GuardrailResult:
"""检查用户输入,阻断明显的恶意或危险内容"""
blocked_patterns = [
"ignore previous instructions",
"forget your system prompt",
"你现在是", # 角色扮演类注入的常见模式
]
lower = user_message.lower()
for pattern in blocked_patterns:
if pattern in lower:
return GuardrailResult(
action=GuardrailAction.BLOCK,
content="对不起,这条指令包含不允许的内容,无法处理。",
reason=f"检测到可能的 Prompt 注入模式: '{pattern}'"
)
return GuardrailResult(action=GuardrailAction.PASS, content=user_message)
# --- 工具护栏 ---
DANGEROUS_TOOLS = {"delete_file", "drop_database", "send_email"}
REQUIRE_APPROVAL_TOOLS = {"write_file", "execute_code", "create_user"}
def tool_guardrail(tool_name: str, tool_input: dict) -> GuardrailResult:
"""在工具调用前校验:是否被允许、是否需要额外确认"""
if tool_name in DANGEROUS_TOOLS:
return GuardrailResult(
action=GuardrailAction.BLOCK,
content=f"工具 '{tool_name}' 已被策略禁止,无法调用。",
reason="危险工具黑名单"
)
if tool_name in REQUIRE_APPROVAL_TOOLS:
# 这里可以接入异步审批流程(见 16.4 节),简化版直接阻断
return GuardrailResult(
action=GuardrailAction.BLOCK,
content=f"工具 '{tool_name}' 需要人工审批,当前自动模式下不可用。",
reason="需要审批的工具"
)
return GuardrailResult(action=GuardrailAction.PASS, content="")
# --- 输出护栏 ---
def output_guardrail(response_text: str) -> GuardrailResult:
"""检查最终输出,过滤敏感信息泄露"""
sensitive_patterns = ["sk-", "api_key=", "password=", "secret="]
for pattern in sensitive_patterns:
if pattern.lower() in response_text.lower():
# 脱敏处理而不是直接阻断
import re
cleaned = re.sub(
r'(sk-|api_key=|password=|secret=)[^\s\'"]+',
r'\1[REDACTED]',
response_text,
flags=re.IGNORECASE
)
return GuardrailResult(
action=GuardrailAction.MODIFY,
content=cleaned,
reason="检测到可能的敏感信息泄露,已脱敏"
)
return GuardrailResult(action=GuardrailAction.PASS, content=response_text)工具护栏在真实系统里不应该只看工具名。更可靠的做法是把调用者、租户、资源、风险等级和当前任务目的一起交给策略引擎判断:
@dataclass
class ToolPolicyContext:
user_id: str
tenant_id: str
scopes: set[str]
environment: str # dev / staging / prod
def tool_guardrail_with_context(
tool_name: str,
tool_input: dict,
ctx: ToolPolicyContext,
) -> GuardrailResult:
if tool_name == "query_database" and "db:read" not in ctx.scopes:
return GuardrailResult(
action=GuardrailAction.BLOCK,
content="当前用户没有数据库读取权限。",
reason="missing db:read scope",
)
if tool_name in {"send_email", "refund_order", "write_file"} and ctx.environment == "prod":
return GuardrailResult(
action=GuardrailAction.BLOCK,
content=f"工具 '{tool_name}' 在生产环境需要人工审批。",
reason="prod high-risk action",
)
return GuardrailResult(action=GuardrailAction.PASS, content="")乐观执行模式
输出护栏可以与主循环并发运行,而不是串行等待:主 Agent 生成输出的同时,后台护栏已经在检查。如果护栏在主 Agent 完成前发现问题,抛出异常中断整个流程。这比"生成完再检查"省了一半延迟。
这个模式在 OpenAI Agents SDK 的文档里称为"Optimistic Execution" [2],实现上用 asyncio.gather 同时跑主流程和检查流程,任何一个提前完成或出错都触发 cancel 另一个。
25.5 添加可观测性
核心直觉
Agent 的 bug 通常不是单次 LLM 调用出了问题,而是某一步的输出影响了后续决策,导致几步之后偏离预期。没有完整的执行 Trace,你根本不知道问题出在哪一步。
用 OpenTelemetry(OTel)埋点,一次埋点,可以把 trace 导到任何后端(Langfuse、Arize Phoenix、Jaeger 等)。
OTel 埋点
# pip install opentelemetry-sdk opentelemetry-exporter-otlp
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.trace import Status, StatusCode
import time
# 初始化 OTel(在应用启动时调用一次)
def setup_tracing(service_name: str = "my-agent"):
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://localhost:4317") # Jaeger/Langfuse 地址
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
tracer = setup_tracing()
def safe_preview(value: str, limit: int = 200) -> str:
"""写入 trace 前做最小脱敏,避免把 prompt、PII、secret 原样进日志。"""
import re
text = str(value)
text = re.sub(r'(sk-|api_key=|password=|secret=)[^\s\'"]+', r'\1[REDACTED]', text, flags=re.IGNORECASE)
return text[:limit]
def run_agent_with_tracing(user_message: str, session_id: str) -> str:
"""带 OTel trace 的 Agent 主循环"""
with tracer.start_as_current_span("agent.run") as root_span:
root_span.set_attribute("session.id", session_id)
root_span.set_attribute("input.preview", safe_preview(user_message))
messages = [{"role": "user", "content": user_message}]
turn = 0
while turn < 10:
turn += 1
# LLM 调用的 Span
with tracer.start_as_current_span("llm.call") as llm_span:
llm_span.set_attribute("llm.turn", turn)
llm_span.set_attribute("llm.input_messages", len(messages))
start = time.time()
response = client.messages.create(
model="<your-claude-model>",
max_tokens=4096,
tools=TOOLS,
messages=messages,
)
elapsed = time.time() - start
llm_span.set_attribute("llm.latency_ms", int(elapsed * 1000))
llm_span.set_attribute("llm.stop_reason", response.stop_reason)
llm_span.set_attribute("llm.input_tokens", response.usage.input_tokens)
llm_span.set_attribute("llm.output_tokens", response.usage.output_tokens)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "end_turn":
final_text = next(
(b.text for b in response.content if b.type == "text"), ""
)
root_span.set_attribute("output.preview", safe_preview(final_text))
return final_text
if response.stop_reason != "tool_use":
root_span.set_status(Status(StatusCode.ERROR, f"unexpected stop: {response.stop_reason}"))
return f"[意外停止: {response.stop_reason}]"
# 工具调用的 Span
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
with tracer.start_as_current_span("tool.call") as tool_span:
tool_span.set_attribute("tool.name", block.name)
tool_span.set_attribute("tool.input.preview", safe_preview(block.input, 500))
t_start = time.time()
result = execute_tool_safe(block.name, block.input)
t_elapsed = time.time() - t_start
tool_span.set_attribute("tool.latency_ms", int(t_elapsed * 1000))
tool_span.set_attribute("tool.result_preview", safe_preview(result))
if result.startswith("错误:") or result.startswith("工具执行"):
tool_span.set_status(Status(StatusCode.ERROR, result[:100]))
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({"role": "user", "content": tool_results})
root_span.set_status(Status(StatusCode.ERROR, "max turns reached"))
return "[达到最大轮次]"每次 LLM 调用都记录了:轮次、输入/输出 Token 数、延迟、停止原因。每次工具调用都记录了:工具名、输入、延迟、结果预览。你拿到 trace 之后,能清楚地看到"第 3 轮 LLM 调用了 search_web,工具耗时 2.3 秒,结果被用于第 4 轮推理"这样的完整链路。
25.6 添加持久化
核心直觉
没有持久化的 Agent 就像没有存档的游戏——进程一死,进度全丢。对于运行几十分钟的任务,这是不可接受的。
持久化的核心操作是 Checkpoint(检查点):在每一轮工具调用之后,把当前的消息历史和状态序列化写入存储。重启时,从最后一个检查点恢复,而不是从头开始。
SQLite 检查点实现
import sqlite3
import json
from datetime import datetime
class CheckpointStore:
def __init__(self, db_path: str = "checkpoints.db"):
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self._init_schema()
def _init_schema(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS checkpoints (
run_id TEXT NOT NULL,
turn INTEGER NOT NULL,
messages TEXT NOT NULL,
metadata TEXT,
created_at TEXT NOT NULL,
PRIMARY KEY (run_id, turn)
)
""")
self.conn.commit()
def save(self, run_id: str, turn: int, messages: list, metadata: dict = None):
"""保存检查点"""
self.conn.execute(
"""INSERT OR REPLACE INTO checkpoints
(run_id, turn, messages, metadata, created_at) VALUES (?, ?, ?, ?, ?)""",
(
run_id,
turn,
json.dumps(serialize_messages(messages), ensure_ascii=False),
json.dumps(metadata or {}),
datetime.utcnow().isoformat()
)
)
self.conn.commit()
def load_latest(self, run_id: str) -> tuple[list, int]:
"""加载最新检查点,返回 (messages, turn)"""
cursor = self.conn.execute(
"SELECT messages, turn FROM checkpoints WHERE run_id = ? ORDER BY turn DESC LIMIT 1",
(run_id,)
)
row = cursor.fetchone()
if row is None:
return [], 0
return json.loads(row[0]), row[1]
def delete(self, run_id: str):
"""任务完成后清理检查点"""
self.conn.execute("DELETE FROM checkpoints WHERE run_id = ?", (run_id,))
self.conn.commit()
checkpoint_store = CheckpointStore()
def serialize_content(content) -> list | str:
"""把 Anthropic SDK 的 content 列表转成可序列化的字典"""
if isinstance(content, str):
return content
result = []
for block in content:
if isinstance(block, dict):
result.append(block)
elif block.type == "text":
result.append({"type": "text", "text": block.text})
elif block.type == "tool_use":
result.append({
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input
})
return result
def serialize_messages(messages: list) -> list:
return [
{"role": msg["role"], "content": serialize_content(msg["content"])}
for msg in messages
]
def run_agent_with_checkpoint(
user_message: str,
run_id: str, # 每次任务的唯一 ID,用于关联检查点
resume: bool = False
) -> str:
"""支持断点续传的 Agent"""
if resume:
messages, start_turn = checkpoint_store.load_latest(run_id)
if not messages:
# 没找到检查点,从头开始
messages = [{"role": "user", "content": user_message}]
start_turn = 0
print(f"[恢复] 从第 {start_turn} 轮继续")
else:
messages = [{"role": "user", "content": user_message}]
start_turn = 0
for turn in range(start_turn, start_turn + 20):
response = client.messages.create(
model="<your-claude-model>",
max_tokens=4096,
tools=TOOLS,
messages=messages,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "end_turn":
checkpoint_store.delete(run_id) # 完成后清理
return next(
(b.text for b in response.content if b.type == "text"), ""
)
if response.stop_reason != "tool_use":
return f"[停止: {response.stop_reason}]"
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
result = execute_tool_safe(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({"role": "user", "content": tool_results})
# 保存到一个可恢复边界:assistant 的 tool_use 后面已经跟着对应 tool_result。
# 如果在 tool_use 之后、tool_result 之前保存,恢复时很容易形成无效消息历史。
checkpoint_store.save(
run_id=run_id,
turn=turn + 1,
messages=messages,
metadata={"stop_reason": response.stop_reason}
)
return "[达到最大轮次]"检查点的序列化注意事项
response.content 是 Anthropic SDK 返回的对象列表,直接 json.dumps 会失败。不要用 default=str 糊过去,否则恢复后消息结构会变成普通字符串,下一轮模型调用可能失效。生产里要像上面一样把每个 ContentBlock 手动转成字典:
def serialize_content(content) -> list:
"""把 Anthropic SDK 的 content 列表转成可序列化的字典"""
result = []
for block in content:
if block.type == "text":
result.append({"type": "text", "text": block.text})
elif block.type == "tool_use":
result.append({
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input
})
return result还有一个更重要的生产细节:checkpoint 只能解决状态恢复,不能自动解决副作用重放。如果工具已经发了邮件、扣了款、写了数据库,但进程在保存 checkpoint 前崩溃,恢复后可能重复执行同一个工具。因此所有有副作用的工具都要带 idempotency key,并在工具侧做去重。
SQLite 适合教学和单机原型。多 Worker 生产部署通常要换成 Postgres / Redis / durable queue,并加上 run-level lock、状态机版本号、checkpoint schema version 和过期清理策略,避免两个 Worker 同时恢复同一个 run。
25.7 添加流式交互与后台任务状态
核心直觉
Agent 处理一个任务可能需要 30 秒甚至更长。用户盯着空白屏幕等 30 秒是很差的体验。流式输出和任务状态推送解决这个问题:让用户实时看到 Agent 在做什么,而不是等到全部完成才能看到结果。
三种推送机制各有适用场景:
| 机制 | 适用场景 | 关键特点 |
|---|---|---|
| SSE(Server-Sent Events) | 实时 token 流、工具状态更新 | 单向推送,HTTP 复用,断线自动重连 |
| WebSocket | 需要双向交互(用户实时纠偏) | 双向通信,适合交互式 Agent |
| Webhook | 长任务完成通知、异步任务结果 | 无需保持连接,适合数分钟以上的任务 |
SSE 流式实现
用 FastAPI 实现 SSE 端点,实时推送 Agent 执行状态:
# pip install fastapi uvicorn anthropic
import asyncio
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncIterator
from anthropic import AsyncAnthropic
app = FastAPI()
async_client = AsyncAnthropic()
async def agent_event_stream(user_message: str) -> AsyncIterator[str]:
"""
异步生成器,产出 SSE 格式的事件流。
每个事件格式:data: <json>\n\n
"""
messages = [{"role": "user", "content": user_message}]
def sse_event(event_type: str, data: dict) -> str:
payload = json.dumps({"type": event_type, **data}, ensure_ascii=False)
return f"data: {payload}\n\n"
yield sse_event("start", {"message": "开始处理任务"})
for turn in range(10):
yield sse_event("thinking", {"turn": turn + 1})
# 使用 Anthropic 的流式 API
tool_results = []
async with async_client.messages.stream(
model="<your-claude-model>",
max_tokens=4096,
tools=TOOLS,
messages=messages,
) as stream:
current_text = ""
async for event in stream:
# 推送增量 token
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
current_text += event.delta.text
yield sse_event("token", {"delta": event.delta.text})
# 流结束,获取完整响应
response = await stream.get_final_message()
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "end_turn":
yield sse_event("complete", {
"answer": next(
(b.text for b in response.content if b.type == "text"), ""
)
})
return
if response.stop_reason != "tool_use":
yield sse_event("error", {"message": f"意外停止: {response.stop_reason}"})
return
# 工具调用,推送工具状态
for block in response.content:
if block.type != "tool_use":
continue
yield sse_event("tool_start", {
"tool": block.name,
"input_preview": str(block.input)[:100]
})
result = await asyncio.get_event_loop().run_in_executor(
None, execute_tool_safe, block.name, block.input
)
yield sse_event("tool_done", {
"tool": block.name,
"result_preview": result[:200]
})
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({"role": "user", "content": tool_results})
yield sse_event("error", {"message": "达到最大轮次"})
@app.get("/agent/stream")
async def stream_agent(message: str):
return StreamingResponse(
agent_event_stream(message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # 关闭 Nginx 缓冲
}
)前端接收 SSE 事件的代码(JavaScript):
const source = new EventSource('/agent/stream?message=帮我搜索最新的Python版本');
source.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'token':
appendText(data.delta); // 实时渲染 token
break;
case 'tool_start':
showToolStatus(`正在调用 ${data.tool}...`);
break;
case 'tool_done':
updateToolStatus(data.tool, '完成');
break;
case 'complete':
source.close();
showFinalAnswer(data.answer);
break;
case 'error':
source.close();
showError(data.message);
break;
}
};25.8 Coding Agent 架构模式
前面几节建立了通用 Agent 的完整骨架。Coding Agent 是 Agent 最复杂的应用场景之一,它在通用骨架之上有几个特有的架构模式。
repo-level context management
Coding Agent 操作的是整个代码库,而不是单个文件。但不能把整个 repo 塞进上下文——一个中型项目有几万甚至几十万行代码。
有效的 repo-level context 管理分三个层次:
import os
import ast
from pathlib import Path
def get_repo_skeleton(repo_path: str, max_depth: int = 3) -> str:
"""生成 repo 目录树(粗粒度,快速给 LLM 方向感)"""
lines = []
for root, dirs, files in os.walk(repo_path):
# 过滤不相关目录
dirs[:] = [d for d in dirs if d not in {'.git', '__pycache__', 'node_modules', '.venv'}]
depth = root.replace(repo_path, '').count(os.sep)
if depth > max_depth:
continue
indent = ' ' * depth
lines.append(f"{indent}{os.path.basename(root)}/")
for f in files:
lines.append(f"{indent} {f}")
return '\n'.join(lines)
def extract_python_signatures(file_path: str) -> str:
"""提取 Python 文件的类和函数签名(不包含函数体)"""
with open(file_path, 'r', encoding='utf-8') as f:
source = f.read()
try:
tree = ast.parse(source)
except SyntaxError:
return f"# 无法解析: {file_path}"
signatures = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
args = [arg.arg for arg in node.args.args]
sig = f"def {node.name}({', '.join(args)}): ..."
signatures.append(sig)
elif isinstance(node, ast.ClassDef):
signatures.append(f"class {node.name}: ...")
return '\n'.join(signatures)
def build_coding_context(task: str, repo_path: str, relevant_files: list[str]) -> str:
"""为编程任务构建最优上下文"""
context_parts = []
# 1. 任务描述
context_parts.append(f"## 任务\n{task}")
# 2. repo 结构(方向感)
context_parts.append(f"## 项目结构\n```\n{get_repo_skeleton(repo_path)}\n```")
# 3. 相关文件的完整内容
for file_path in relevant_files:
full_path = os.path.join(repo_path, file_path)
if os.path.exists(full_path):
with open(full_path) as f:
content = f.read()
context_parts.append(f"## {file_path}\n```python\n{content}\n```")
return '\n\n'.join(context_parts)edit-apply loop
Coding Agent 生成代码修改时,有两种模式:
- 生成完整文件:简单,但对大文件浪费 Token,且容易引入不相关改动
- 生成 Diff / Patch:精确,但需要正确解析和应用
生产级 Coding Agent 通常用一种"伪 diff"格式,让 LLM 生成 search/replace 块:
import re
def apply_edit(original: str, search: str, replace: str) -> tuple[str, bool]:
"""
应用一次 search/replace 编辑。
返回 (new_content, success)。
"""
if search not in original:
# LLM 给的 search 块不匹配,失败
return original, False
# 确认只有一处匹配(多处匹配时让 LLM 提供更多上下文)
if original.count(search) > 1:
return original, False
return original.replace(search, replace, 1), True
def parse_edits_from_response(response_text: str) -> list[dict]:
"""
从 LLM 响应中解析编辑块。
期望格式:
```
<<<<<<< SEARCH
原始代码
=======
新代码
>>>>>>> REPLACE
```
"""
pattern = r'<<<<<<< SEARCH\n(.*?)\n=======\n(.*?)\n>>>>>>> REPLACE'
matches = re.findall(pattern, response_text, re.DOTALL)
return [{"search": m[0], "replace": m[1]} for m in matches]
def apply_all_edits(file_path: str, edits: list[dict]) -> tuple[bool, str]:
"""
对文件应用一组编辑,全部成功才写回。
失败时返回失败原因,文件不改动。
"""
with open(file_path) as f:
content = f.read()
for i, edit in enumerate(edits):
content, success = apply_edit(content, edit["search"], edit["replace"])
if not success:
return False, f"第 {i+1} 处编辑匹配失败:\n```\n{edit['search'][:100]}\n```"
with open(file_path, 'w') as f:
f.write(content)
return True, "所有编辑已应用"test-fix cycle
一个完整的 test-fix cycle 是这样的:Agent 修改代码 → 运行测试 → 如果测试失败,把失败信息喂给 LLM → LLM 生成修复 → 再运行测试 → 直到通过或放弃。
关键工程细节是防止无限循环:
import subprocess
import shlex
def run_tests(test_command: str, timeout: int = 60) -> tuple[bool, str]:
"""运行测试,返回 (passed, output)"""
try:
result = subprocess.run(
shlex.split(test_command),
capture_output=True,
text=True,
timeout=timeout
)
passed = result.returncode == 0
output = result.stdout + result.stderr
return passed, output[:3000] # 截断避免过长
except subprocess.TimeoutExpired:
return False, f"测试超时({timeout}s)"
except Exception as e:
return False, f"测试运行失败: {e}"
def test_fix_cycle(
file_path: str,
test_command: str,
max_attempts: int = 3
) -> tuple[bool, str]:
"""
测试-修复循环。
max_attempts: 最多尝试 N 次修复,超出则放弃(防无限循环)
"""
for attempt in range(max_attempts):
passed, output = run_tests(test_command)
if passed:
return True, f"测试在第 {attempt + 1} 次尝试后通过"
if attempt == max_attempts - 1:
return False, f"达到最大修复次数({max_attempts}),测试仍然失败"
# 喂给 Agent 修复
fix_prompt = f"""
测试失败了,以下是测试输出:请分析失败原因并修复 {file_path} 中的代码。
只修改导致测试失败的代码,不要引入其他改动。
"""
# 这里调用主 Agent 进行修复(略去完整调用,概念同 run_agent)
fix_response = run_agent(fix_prompt, max_turns=5)
# 解析并应用编辑
edits = parse_edits_from_response(fix_response)
if edits:
success, msg = apply_all_edits(file_path, edits)
if not success:
return False, f"编辑应用失败: {msg}"
return False, "修复失败"安全边界
Coding Agent 有几个必须遵守的安全规则,违反任何一条都可能造成数据损失:
- 只操作任务相关的文件:Agent 不应该修改用户没有提到的文件,更不应该修改系统文件
- 保护未提交的改动:在 Agent 开始工作之前,检查
git status,不覆盖用户正在编写的变更 - 命令执行必须沙箱化:
subprocess.run不要直接执行 LLM 生成的 shell 命令,需要白名单验证
ALLOWED_PREFIXES = [
("python", "-m", "pytest"),
("pytest",),
("npm", "test"),
("git", "status"),
("git", "diff"),
]
def safe_execute_command(command: list[str]) -> tuple[bool, str]:
"""只允许白名单命令执行"""
if not command:
return False, "空命令"
if not any(tuple(command[:len(prefix)]) == prefix for prefix in ALLOWED_PREFIXES):
return False, f"命令不在允许列表中: {' '.join(command[:3])}"
# 额外检查:不允许 shell 注入字符
full_cmd = " ".join(command)
for dangerous in [";", "&&", "||", "|", ">", "<", "`", "$(", "${", "\n"]:
if dangerous in full_cmd:
return False, f"命令包含不允许的字符: '{dangerous}'"
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=30,
# 不继承 shell 环境变量里的敏感凭据
env={"PATH": "/usr/local/bin:/usr/bin:/bin"}
)
return result.returncode == 0, result.stdout + result.stderr
except Exception as e:
return False, str(e)把所有能力组合在一起
前面八个小节各自独立,但在实际系统里它们需要协同工作。下面是一个把核心能力组合在一起的 Agent 入口:
import uuid
class ProductionAgent:
"""
把记忆、工具层、Guardrails、可观测性、持久化整合在一起的 Agent。
流式交互通过 FastAPI 端点暴露(见 25.7 节)。
"""
def __init__(self):
self.memory = AgentMemory()
self.checkpoints = CheckpointStore()
def run(
self,
user_message: str,
session_id: str,
run_id: str = None,
resume: bool = False
) -> str:
run_id = run_id or str(uuid.uuid4())
# 1. 输入护栏
input_check = input_guardrail(user_message)
if input_check.action == GuardrailAction.BLOCK:
return input_check.content
# 2. 加载长期记忆
memories = self.memory.load_recent(session_id)
memory_context = "\n".join(memories) if memories else ""
system_context = "你是一个有用的助手。" + (
f"\n\n用户历史偏好:\n{memory_context}" if memory_context else ""
)
# 3. 断点续传
if resume:
messages, start_turn = self.checkpoints.load_latest(run_id)
else:
messages = []
if not messages:
messages = [{"role": "user", "content": user_message}]
start_turn = 0
# 4. OTel trace 包裹的主循环
with tracer.start_as_current_span("agent.production_run") as span:
span.set_attribute("session.id", session_id)
span.set_attribute("run.id", run_id)
for turn in range(start_turn, start_turn + 20):
# 上下文压缩
if count_tokens(messages) > 150_000:
messages = compact_messages(messages)
response = client.messages.create(
model="<your-claude-model>",
max_tokens=4096,
system=system_context,
tools=TOOLS,
messages=messages,
)
messages.append({"role": "assistant", "content": response.content})
# 保存检查点
self.checkpoints.save(run_id, turn + 1, messages)
if response.stop_reason == "end_turn":
final_text = next(
(b.text for b in response.content if b.type == "text"), ""
)
# 5. 输出护栏
output_check = output_guardrail(final_text)
result = output_check.content if output_check.action in (
GuardrailAction.PASS, GuardrailAction.MODIFY
) else "内容无法展示"
self.checkpoints.delete(run_id)
return result
if response.stop_reason != "tool_use":
return f"[停止: {response.stop_reason}]"
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
# 工具护栏
tool_check = tool_guardrail(block.name, block.input)
if tool_check.action == GuardrailAction.BLOCK:
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": tool_check.content,
})
continue
result = execute_tool_safe(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({"role": "user", "content": tool_results})
return "[达到最大轮次]"常见误区
误区一:把所有能力都加进第一个版本
从 MVP 开始:先跑通主循环,再加工具层,再加 Guardrails,再加持久化。如果你第一天就把所有能力堆在一起,出了问题你不知道哪一层引入的。
误区二:以为框架屏蔽了这些复杂度
LangGraph 的 MemorySaver 内部就是 Checkpoint 序列化;CrewAI 的 Process.sequential 内部就是消息历史管理;OpenAI SDK 的 guardrail 就是输入/输出检查。你用框架,这些复杂度还在,只是被隐藏了。理解了原理,你才能在框架行为异常时知道去哪里找问题。
误区三:测试只测最终输出
Agent 的 bug 通常藏在中间步骤——第三轮工具调用的参数错了,但最终输出看起来还算正常。测试要覆盖 Transcript(完整执行轨迹),而不只是最终答案。第 18 章的 Eval 体系里讲了如何系统地做这件事。
误区四:eval 可以直接用于工具调用
上面 calculate 工具的示例用了 eval,并且加了 {"__builtins__": {}} 的限制。但这个限制是不完整的——绕过它的方式已经被广泛记录。生产里的代码执行必须用独立沙箱(microVM 或容器),不能在主进程里用 eval。
面试高频题
Q:如果不用任何框架,你如何从零实现一个支持工具调用和记忆的 Agent?
参考答案框架:
- 核心循环:
messages列表 +while True+stop_reason判断 - 工具调用:解析
tool_useblock → 查工具注册表 → 执行 → 追加tool_result - 短期记忆:
messages列表本身;超出上限时用 LLM 摘要压缩 - 长期记忆:关键信息写入 SQLite,每次任务开始时加载相关条目注入 System Prompt
- 持久化:每轮工具调用后序列化
messages到存储,重启时恢复
加分点:
- 提到工具层的参数校验(Pydantic)和超时控制(
ThreadPoolExecutor) - 提到 OTel 埋点——"我需要知道第几轮出了问题"
- 提到检查点的序列化细节(SDK 对象不能直接 JSON 序列化,需要手动转换)
- 提到
eval沙箱风险,Coding Agent 的execute_code必须用独立执行环境
Q:Coding Agent 的 test-fix cycle 中,如何防止陷入无限修复循环?
参考答案框架:
- 硬性上限:
max_attempts参数,超出后停止并上报,不再尝试 - 失败模式检测:连续两次生成了相同的修复内容,说明 LLM 卡住了,直接停止
- 收敛检测:如果测试失败数量没有减少(或增加了),停止修复
- 回退点:每次修复前保存 diff,失败超出阈值后回滚到任务开始时的状态
加分点:提到给 LLM 的错误信息格式很重要——原始测试输出太长会稀释有用信息,应该提取关键失败行而不是全量输出。
参考资料
[1] Building Effective Agents - Anthropic (https://www.anthropic.com/engineering/building-effective-agents)
[2] Agents SDK Guardrails - OpenAI (https://openai.github.io/openai-agents-python/guardrails/)
[3] Effective context engineering - Anthropic (https://www.anthropic.com/engineering/effective-context-engineering-for-ai-agents)
[4] AI agent observability - OpenTelemetry (https://opentelemetry.io/blog/2025/ai-agent-observability/)
[5] A practical guide to building agents - OpenAI (https://cdn.openai.com/business-guides-and-resources/a-practical-guide-to-building-agents.pdf)
[6] Anthropic Messages API 文档 (https://docs.anthropic.com/en/api/messages)
[7] Anthropic Token Counting API 文档 (https://docs.anthropic.com/en/api/messages-count-tokens)
[8] Anthropic Streaming Messages 文档 (https://docs.anthropic.com/claude/reference/messages-streaming)