Mini Query Engine Demo¶
从 3419 行
services/api/claude.ts提炼到 ~350 行的教学 demo。 展示 Claude Code agent loop 的核心 4 件套:streaming + tool use + retry + abort。
这是什么?¶
CC 的 services/api/claude.ts 是整个 agent loop 的核心:接收用户输入 → 流式调 API → 解析 tool use → 调度工具 → 回到 API。直到没有 tool_use 才完成。真实代码 3419 行,本 demo 把它简化到 ~350 行,保留全部核心机制。
文件结构¶
mini-query-engine/
├── src/
│ ├── engine.ts ← 核心:QueryEngine 类(~200 行)
│ ├── mock-api.ts ← Mock API(模拟流式响应 + 错误注入)
│ ├── cli.ts ← CLI 入口
│ └── tools/
│ ├── types.ts ← 共享类型
│ ├── echo.ts ← echo 工具
│ └── calc.ts ← calc 工具
├── examples/
│ └── run.ts ← 编程式调用示例(并发 3 个 query)
├── package.json
├── tsconfig.json
└── README.md
怎么跑?¶
# 1. 装依赖(仅 typescript + @types/node)
npm install
# 2. 编译 + 跑 CLI(默认消息 "Hello, mini engine!")
npm start
# 3. 跑自定义 prompt
node dist/src/cli.js "echo: hello"
node dist/src/cli.js "calc: 1 + 2 * 3"
node dist/src/cli.js "fail: test 5xx retry"
node dist/src/cli.js "abort: test abort"
# 4. 跑并发示例
npm run example
不需要 API key ——
MockApiClient根据 prompt 前缀模拟所有行为。
真实代码对照表¶
| Demo 文件 | 真实文件 | 行数对比 | 简化了啥 |
|---|---|---|---|
src/engine.ts |
src/services/api/claude.ts |
350 vs 3419 | 去掉 MCP / 缓存 / 鉴权 / 钩子 / DCE / 95% 错误处理分支 |
src/mock-api.ts |
src/services/api/client.ts |
110 vs ~600 | 用字符串前缀路由代替 LLM 决策 |
src/tools/echo.ts |
src/tools/EchoTool/EchoTool.ts |
25 vs ~150 | 去掉 streaming / permission / UI |
src/tools/calc.ts |
src/tools/BashTool/ |
30 vs ~2000 | eval 代替 bash 子进程(仅 demo,真实代码绝对不用 eval) |
核心设计 4 件套¶
1️⃣ AsyncGenerator 作为统一事件流¶
async *run(messages: Message[]): AsyncGenerator<EngineEvent, void, void> {
for await (const event of this.streamWithRetry(messages, toolDefs)) {
yield event; // ← 每个事件 yield 出去
}
}
优势:
- 调用方 for await 一行搞定,不用写 callback / EventEmitter
- 天然支持 backpressure(慢消费方不会压垮 producer)
- 可以用 return() 取消(abort)
真实代码:claude.ts 用 ReadableStream + 自定义 parser,效果一样但代码多 3 倍。
2️⃣ Discriminated Union 作为事件类型¶
type EngineEvent =
| { type: "text"; text: string }
| { type: "tool_use"; id: string; name: string; input: unknown }
| { type: "tool_result"; tool_use_id: string; content: string; is_error: boolean }
| { type: "error"; error: Error; retryable: boolean }
| { type: "done"; usage: Usage };
优势:
- TypeScript 自动收窄(if (event.type === "text") event.text 必存在)
- 消费者用 switch + case 处理,新增事件类型时 TS 报错提示"忘了处理"
- 替代多 callback / EventEmitter 的"事件名拼错"陷阱
3️⃣ Tool Use 循环 = generator 内嵌 generator¶
// 外层 generator:管理 turn
for await (const event of this.streamWithRetry(messages, toolDefs)) {
yield event;
if (event.type === "tool_use") {
const result = await tool.execute(event.input);
yield { type: "tool_result", ... };
// 回到 for 顶端,自动发起下一轮
}
}
真实代码用 messages 数组累积 tool_result,相同效果。
4️⃣ 退避重试 = while + sleep¶
while (true) {
try {
yield* this.opts.api.stream(req);
return;
} catch (err) {
if (err.status === 429 || err.status >= 500) {
await sleep(1000 * 2 ** attempt, signal); // 1s → 2s → 4s → 8s
continue;
}
throw err;
}
}
真实代码:src/utils/backoff.ts:30 retryWithBackoff() 是一样的逻辑,加 jitter 防雷鸣。
Mock API 路由表¶
| prompt 前缀 | 行为 | 演示场景 |
|---|---|---|
echo: <text> |
调 echo 工具 | 基础 tool use 流程 |
calc: <expr> |
调 calc 工具 + eval | 工具参数解析 + 错误传播 |
fail: <any> |
抛 5xx 错误 | 重试 + 指数退避 |
abort: <any> |
抛 AbortError | 取消正在进行的 stream |
| 其他 | 直接 text 回复 | 纯文本对话 |
进阶练习¶
- 加 SSE transport:把
MockApiClient换成fetch + ReadableStream(SSE 格式) - 加 prompt cache:在 engine 里加
cache_control标记,模拟 CC 的 cache strategies - 加 max_tokens 截断:yield "text" 时累计 token,超 max 就 stop
- 加 tool 并行:当前 tool 串行执行,改成
Promise.all - 加 usage 限流:在 mock-api 里随机抛 429,看 engine 怎么处理
相关阅读¶
- topics/deep-dive-claude-api.md —— 真实 3419 行的完整分析
- topics/async-generator-pattern.md —— async function* 模式
- topics/deep-dive-query-engine.md —— 1295 行的 QueryEngine 拆解
- phase-06-agent-loop.md —— agent loop 全景
- walkthrough/handwrite-query-engine.md —— 手写版答案