Skip to content

Layer 1: Agent Loop —— 核心引擎

Agent 循环是如何实现「思考 → 执行 → 观察 → 继续」这个循环的?它不是一个简单的 while loop,而是一个带状态机、错误恢复、流式处理的工业级 Agent Runtime。

这是整个 Claude Code 架构的心脏。理解了这一层,就理解了 Agent 框架的骨架。


┌──────────────────────┐
│ REPL.tsx │ ← 用户交互入口
│ handlePromptSubmit() │
└──────────┬───────────┘
┌──────────▼───────────┐
│ QueryEngine.ts │ ← SDK/Headless 封装
│ submitMessage() │
└──────────┬───────────┘
┌──────────▼───────────┐
│ query.ts │ ← async generator 入口
│ query() │
└──────────┬───────────┘
┌────────────────▼────────────────┐
│ queryLoop() │ ← while(true) 主循环
│ │
│ ┌──────────────────────────┐ │
│ │ Phase 1: 压缩与上下文准备 │ │
│ │ snip → micro → collapse │ │
│ │ → autocompact │ │
│ └───────────┬──────────────┘ │
│ ▼ │
│ ┌──────────────────────────┐ │
│ │ Phase 2: API 流式调用 │ │
│ │ queryModelWithStreaming() │ │
│ │ → 收集 tool_use blocks │ │
│ └───────────┬──────────────┘ │
│ ▼ │
│ ┌──────────────────────────┐ │
│ │ Phase 3: 工具执行 │ │
│ │ StreamingToolExecutor │ │
│ │ or runTools() │ │
│ └───────────┬──────────────┘ │
│ ▼ │
│ ┌──────────────────────────┐ │
│ │ Phase 4: 错误恢复 │ │
│ │ withholding → collapse │ │
│ │ → reactive compact │ │
│ └───────────┬──────────────┘ │
│ ▼ │
│ ┌──────────────────────────┐ │
│ │ Phase 5: 停止钩子与决策 │ │
│ │ stop hooks → token budget │ │
│ │ → Continue or Terminal │ │
│ └───────────┬──────────────┘ │
│ │ │
│ Continue?──┤ │
│ Yes → 回到 Phase 1 │
│ No → return Terminal │
└─────────────────────────────────┘
文件职责关键函数
query.ts (~69KB)Agent 循环主逻辑query(), queryLoop()
QueryEngine.ts (~47KB)查询引擎协调器submitMessage()
query/config.ts不可变查询配置buildQueryConfig()
query/transitions.ts状态转换定义Continue, Terminal 类型
query/tokenBudget.tsToken 预算追踪checkTokenBudget()
query/stopHooks.ts停止钩子编排handleStopHooks()
query/deps.ts依赖注入QueryDeps, productionDeps()
services/api/claude.tsAPI 流式通信queryModelWithStreaming()
utils/messages.ts消息处理handleMessageFromStream()
utils/handlePromptSubmit.ts用户输入处理handlePromptSubmit()

Agent 循环的核心不是 while (condition),而是一个显式状态机——每次循环迭代都会产生一个明确的「为什么继续」或「为什么停止」的原因。

// query.ts:204
type State = {
messages: Message[] // 当前对话历史
toolUseContext: ToolUseContext // 工具执行上下文
autoCompactTracking: AutoCompactTrackingState | undefined // 压缩状态追踪
maxOutputTokensRecoveryCount: number // max_output_tokens 恢复次数
hasAttemptedReactiveCompact: boolean // 是否已尝试反应式压缩
maxOutputTokensOverride: number | undefined // 输出 token 上限覆盖
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
stopHookActive: boolean | undefined // stop hook 是否激活
turnCount: number // 当前轮次计数
transition: Continue | undefined // 上一次迭代为什么继续
}

★ 设计洞察State可变的,在每次循环末尾整体赋值 state = next。这比参数透传更简洁,但通过整体赋值(而非零散修改)保持了一定的可追踪性。

query/transitions.ts
type Continue = {
reason:
| 'next_turn' // 正常:工具执行完毕,需要模型看结果
| 'reactive_compact_retry' // 恢复:prompt-too-long 后成功压缩,重试
| 'collapse_drain_retry' // 恢复:上下文折叠后重试
| 'max_output_tokens_recovery'// 恢复:提升输出上限后重试
| 'stop_hook_blocking' // 恢复:stop hook 阻止后,让模型看到错误
| 'token_budget_continuation' // 预算:token 预算未用完,继续
}
type Terminal = {
reason:
| 'completed' // 正常结束:模型说完了
| 'max_turns' // 达到最大轮次限制
| 'prompt_too_long' // 压缩恢复失败
| 'aborted_streaming' // 用户中断(API 流式阶段)
| 'aborted_tools' // 用户中断(工具执行阶段)
| 'hook_stopped' // Hook 附件阻止继续
| 'stop_hook_prevented' // Stop hook 返回 preventContinuation
| 'image_error' // 图片验证失败
| 'model_error' // API 不可恢复错误
| 'blocking_limit' // 硬性 token 上限
}

★ 设计洞察:显式的 Continue/Terminal 类型有两个好处:

  1. 可测试性——测试可以断言恢复路径是否触发,而不必检查消息内容
  2. 可观测性——日志/遥测可以直接记录循环终止原因,便于排障

目标:在调用 API 之前,确保消息总量不超过上下文窗口。

执行顺序:
1. Tool result 预算检查 → 限制单轮工具结果的聚合大小
2. Snip 压缩 → 切除最早的历史块(如果开启)
3. Microcompact → 在 prompt cache 内编辑,移除旧的 tool_use blocks
4. Context collapse → 语义区间折叠(将一段对话折叠为摘要)
5. Autocompact → 全量对话摘要(达到 token 阈值时触发)
6. Blocking limit 检查 → 硬性上限,超过直接返回 Terminal

关键逻辑:Autocompact 有一个熔断器——consecutiveFailures 计数器。如果连续压缩失败,不会无限重试,而是放弃并上报。

目标:将准备好的消息发送给 Claude API,并流式处理响应。

// query.ts 核心循环中
for await (const message of deps.callModel(messagesForQuery, ...)) {
// 1. 处理流式降级(模型过载时切换备用模型)
if (streamingFallbackOccured) {
// 丢弃进行中的工具、yield tombstone 消息、重试
continue
}
// 2. Withhold 可恢复错误(不立即暴露给用户)
if (isWithheldPromptTooLong(message)) withheld = true
if (isWithheldMaxOutputTokens(message)) withheld = true
// 3. 正常消息 → yield 给消费者(REPL 或 SDK)
if (!withheld) yield message
// 4. 收集 tool_use blocks
if (message has tool_use) {
toolUseBlocks.push(...)
needsFollowUp = true
// 如果启用了流式工具执行,立即开始执行
streamingToolExecutor?.addTool(toolBlock, message)
}
// 5. 消费流式工具执行器中已完成的结果
for (const result of streamingToolExecutor?.getCompletedResults()) {
yield result.message
toolResults.push(result)
}
}

★ 设计洞察Withholding(扣留)可恢复错误是一个精妙的设计。当 API 返回 prompt_too_long 时,不立即告诉用户「出错了」,而是先扣住错误,尝试 collapse/compact 恢复。只有恢复也失败时,才暴露错误。这避免了「错误→恢复→错误→恢复」的无限循环,同时让用户感知到流畅的体验。

目标:执行模型请求的工具调用,收集结果。

有两种执行模式:

模式 A:流式工具执行(StreamingToolExecutor)

  • 工具在 API 流式返回的过程中就开始执行
  • addTool() 在收到 tool_use block 时立即入队
  • getCompletedResults() 在流式过程中取回已完成的结果
  • 延迟收益:工具执行时间隐藏在 API 流式传输时间内,每轮节省 5-15 秒

模式 B:传统批次执行(runTools)

  • 等 API 流式完全结束后,批次执行所有工具
  • 并发安全性分区:只读工具并行,写操作串行
工具分区示例:
[Read, Grep, Glob] → 并发批次 1(全部并行执行)
[FileEdit] → 串行批次 1(独占执行)
[Read, Read] → 并发批次 2(全部并行执行)

目标:处理 Phase 2 中扣留的可恢复错误。

扣留的 prompt_too_long?
├─ 尝试 Context Collapse Drain → 成功? → Continue(collapse_drain_retry)
├─ 尝试 Reactive Compact → 成功? → Continue(reactive_compact_retry)
└─ 全部失败 → yield 错误 → Terminal(prompt_too_long)
扣留的 max_output_tokens?
├─ 第 1 次:静默提升上限 8K → 64K → Continue(max_output_tokens_recovery)
├─ 第 2-3 次:注入恢复消息给模型 → Continue
└─ 超过 3 次:yield 错误 → Terminal

目标:决定循环是继续还是终止。

// 1. 执行 Stop Hooks
const stopHookResult = yield* handleStopHooks(...)
// handleStopHooks 内部做了很多事:
// - 保存 cache-safe params(供 /resume 恢复)
// - 分类作业状态
// - 触发后台任务(prompt suggestion, memory extraction, auto-dream)
// - 执行用户定义的 stop hooks
// - 如果是 teammate:执行 TaskCompleted / TeammateIdle hooks
// 2. Stop Hook 阻止?
if (stopHookResult.blockingErrors) → Continue(stop_hook_blocking)
if (stopHookResult.preventContinuation) → Terminal(stop_hook_prevented)
// 3. Token 预算检查
const decision = checkTokenBudget(budgetTracker, agentId, budget, outputTokens)
if (decision.action === 'continue') → Continue(token_budget_continuation)
if (decision.action === 'stop') → Terminal
// 4. maxTurns 检查
if (turnCount >= maxTurns) → Terminal(max_turns)
// 5. 需要跟进?(有工具被执行)
if (needsFollowUp) {
state = { messages: [...messages, ...assistantMessages, ...toolResults], ... }
// → 回到 Phase 1
}
if (!needsFollowUp) → Terminal(completed)

query.ts
└─ deps.callModel()
└─ queryModelWithStreaming() [services/api/claude.ts]
└─ withRetry() [services/api/withRetry.ts]
└─ queryModel() [services/api/claude.ts]
└─ client.messages.create({ stream: true }) [Anthropic SDK]

API 返回的原始流式事件被逐步重建为完整消息:

message_start → 初始化跟踪(model, usage)
content_block_start → 新内容块(text / tool_use / thinking)
content_block_delta → 增量内容(text 片段 / tool input JSON 片段)
content_block_stop → 内容块完成
message_delta → 更新 stop_reason, 累计 usage
message_stop → 最终化,emit AssistantMessage
services/api/withRetry.ts
withRetry(fn, { maxRetries: 3, backoff: exponential }):
可重试: 429 (rate limit), 500, 502, 503, 529 (overloaded)
不可重试: 400, 401, 403, 404, 413 (prompt too long)
降级: 529 + 配置了 fallbackModel → 切换模型重试

★ 设计洞察:降级(fallback)发生时,会 yield TombstoneMessage 来通知 UI 删除部分渲染的内容,然后用备用模型重新开始。这对用户来说是「等了一下,然后正常返回了」,而不是一个红色错误。

API 调用携带多种 beta header,控制高级特性:

Header功能
PROMPT_CACHING_SCOPE_BETA_HEADERPrompt Cache 范围控制
CONTEXT_1M_BETA_HEADER1M 上下文支持
TASK_BUDGETS_BETA_HEADERToken 预算控制
FAST_MODE_BETA_HEADER快速模式加速
AFK_MODE_BETA_HEADER离开模式(延迟响应)
EFFORT_BETA_HEADER模型 effort 等级控制

5.1 QueryEngine(SDK / Headless 入口)

Section titled “5.1 QueryEngine(SDK / Headless 入口)”
// QueryEngine.ts:184
class QueryEngine {
mutableMessages: Message[] // 持久化的对话历史
totalUsage: NonNullableUsage // 累计 token/成本
async *submitMessage(
prompt: string | ContentBlockParam[],
options?: { uuid?: string; isMeta?: boolean }
): AsyncGenerator<SDKMessage, void, unknown> {
// 1. 处理用户输入(附件、斜杠命令、记忆)
// 2. 构建系统提示词(带上下文)
// 3. 调用 query() 获取 async generator
// 4. yield SDK 兼容的消息
// 5. 持久化 transcript 到存储
}
}

每次 submitMessage() 启动一个新轮次,但在同一个 QueryEngine 实例内共享消息历史。

REPL.tsx (React 组件)
├─ 渲染消息列表(VirtualMessageList)
├─ 渲染输入框(PromptInput)
├─ 用户提交 → handlePromptSubmit()
│ └─ 校验输入 → 展开粘贴引用 → 路由到 onQuery 回调
├─ onQuery → 调用 query() async generator
├─ for await (msg of queryGenerator):
│ └─ 更新 UI(消息、spinner、权限对话框)
└─ 循环结束 → 显示新的输入提示

query/deps.ts
type QueryDeps = {
callModel: typeof queryModelWithStreaming // API 调用
microcompact: typeof microcompactMessages // 微压缩
autocompact: typeof autoCompactIfNeeded // 自动压缩
uuid: () => string // UUID 生成
}
function productionDeps(): QueryDeps {
return {
callModel: queryModelWithStreaming,
microcompact: microcompactMessages,
autocompact: autoCompactIfNeeded,
uuid: randomUUID,
}
}
QueryConfig (不可变,循环开始时捕获一次):
├─ Feature gates 快照
├─ Session ID
└─ 静态配置
State (可变,每次迭代更新):
├─ messages
├─ toolUseContext
├─ turnCount
└─ 恢复计数器

★ 设计洞察:为什么 deps 只包含 4 个函数?这是窄接口原则——只暴露测试需要 mock 的 I/O 边界,其余逻辑保持纯函数。这使得测试不需要 module spying,直接注入 mock 即可。


示例 1:简单问答(无工具调用)

Section titled “示例 1:简单问答(无工具调用)”
用户输入: "什么是 TypeScript?"
1. REPL → handlePromptSubmit("什么是 TypeScript?")
2. query(params) 启动
3. queryLoop 第 1 轮:
Phase 1: 无需压缩(第 1 轮)
Phase 2: API 流式返回纯文本
Phase 3: 无 tool_use → needsFollowUp = false
Phase 5: → Terminal({ reason: 'completed' })
4. query() 返回,REPL 显示回答

示例 2:带工具调用(2 轮循环)

Section titled “示例 2:带工具调用(2 轮循环)”
用户输入: "读取 /src/main.ts 并解释"
1. REPL → handlePromptSubmit(...)
2. queryLoop 第 1 轮:
Phase 2: API 返回 tool_use(FileRead, { path: "/src/main.ts" })
Phase 3: 执行 FileRead → 返回文件内容
Phase 5: needsFollowUp = true → Continue(next_turn)
3. queryLoop 第 2 轮:
messages = [用户输入, assistant(tool_use), tool_result(文件内容)]
Phase 2: API 看到文件内容,返回解释文本
Phase 3: 无 tool_use → needsFollowUp = false
Phase 5: → Terminal({ reason: 'completed' })

示例 3:错误恢复(压缩后重试)

Section titled “示例 3:错误恢复(压缩后重试)”
长对话第 50 轮:
queryLoop 第 50 轮:
Phase 1: 压缩检查 → 未达阈值,跳过
Phase 2: API 返回 prompt_too_long(被 withhold)
Phase 4:
尝试 collapse drain → 失败
尝试 reactive compact → 成功!生成摘要
→ Continue(reactive_compact_retry)
queryLoop 第 50 轮(重试):
Phase 1: 消息已被压缩(从 50 条变为 摘要 + 最近 5 条)
Phase 2: API 正常返回
...正常继续

错误检测方式恢复策略最终降级
prompt_too_long (413)WithholdCollapse drain → Reactive compactTerminal(prompt_too_long)
max_output_tokensWithhold提升上限 8K→64K → 注入恢复消息(≤3次)Terminal
模型过载 (529)withRetry指数退避 + 切换备用模型传播错误
用户中断AbortSignal停止流式/工具Terminal(aborted_*)
超过 maxTurnsturnCount 检查yield 提示Terminal(max_turns)
工具执行错误tool executoryield 错误 tool_result(模型可见)循环继续
Stop hook 阻止handleStopHooks注入阻止消息给模型Continue(stop_hook_blocking)

为什么用 Async Generator 而不是 Callback / Event Emitter?

Section titled “为什么用 Async Generator 而不是 Callback / Event Emitter?”
  1. 背压控制:消费者(REPL/SDK)可以控制消费速度,不会被 flood
  2. 组合性yield* 可以嵌套组合多个 generator(query → queryLoop → handleStopHooks)
  3. 资源清理:generator 的 return() 方法支持优雅退出
  4. 类型安全AsyncGenerator&lt;YieldType, ReturnType> 在类型层面区分中间值和最终值

传统做法:错误发生 → 立即报错 → 用户看到红色错误 → 可能手动 /compact → 重试

Claude Code 做法:错误发生 → 扣住不报 → 自动尝试恢复 → 恢复成功,用户无感知;恢复失败,才报错

传统模式:
API 流式 (5s) → 等待完成 → 工具执行 (3s) → 总计 8s
流式模式:
API 流式 (5s, 第 2 秒收到 tool_use → 立即开始执行)
工具执行与 API 流式并行 → 总计 5s(节省 3s/轮)

交互方向说明
→ L2 (Tool 系统)Phase 3 调用 runTools() / StreamingToolExecutor 执行工具
→ L3 (权限系统)工具执行前通过 canUseTool() 检查权限
→ L5 (上下文管理)Phase 1 的压缩策略、Phase 4 的 reactive compact
→ L6 (子 Agent)子 Agent 通过 query() 递归调用获得独立的 Agent Loop
← 流式 APIservices/api/claude.ts 提供流式消息流
← 状态管理ToolUseContext 在循环中传递和更新全局状态