跳转至

Mini Query Engine Demo

从 3419 行 services/api/claude.ts 提炼到 ~350 行的教学 demo。 展示 Claude Code agent loop 的核心 4 件套:streaming + tool use + retry + abort。

这是什么?

CC 的 services/api/claude.ts 是整个 agent loop 的核心:接收用户输入 → 流式调 API → 解析 tool use → 调度工具 → 回到 API。直到没有 tool_use 才完成。真实代码 3419 行,本 demo 把它简化到 ~350 行,保留全部核心机制。

文件结构

mini-query-engine/
├── src/
│   ├── engine.ts          ← 核心:QueryEngine 类(~200 行)
│   ├── mock-api.ts        ← Mock API(模拟流式响应 + 错误注入)
│   ├── cli.ts             ← CLI 入口
│   └── tools/
│       ├── types.ts       ← 共享类型
│       ├── echo.ts        ← echo 工具
│       └── calc.ts        ← calc 工具
├── examples/
│   └── run.ts             ← 编程式调用示例(并发 3 个 query)
├── package.json
├── tsconfig.json
└── README.md

怎么跑?

# 1. 装依赖(仅 typescript + @types/node)
npm install

# 2. 编译 + 跑 CLI(默认消息 "Hello, mini engine!")
npm start

# 3. 跑自定义 prompt
node dist/src/cli.js "echo: hello"
node dist/src/cli.js "calc: 1 + 2 * 3"
node dist/src/cli.js "fail: test 5xx retry"
node dist/src/cli.js "abort: test abort"

# 4. 跑并发示例
npm run example

不需要 API key —— MockApiClient 根据 prompt 前缀模拟所有行为。

真实代码对照表

Demo 文件 真实文件 行数对比 简化了啥
src/engine.ts src/services/api/claude.ts 350 vs 3419 去掉 MCP / 缓存 / 鉴权 / 钩子 / DCE / 95% 错误处理分支
src/mock-api.ts src/services/api/client.ts 110 vs ~600 用字符串前缀路由代替 LLM 决策
src/tools/echo.ts src/tools/EchoTool/EchoTool.ts 25 vs ~150 去掉 streaming / permission / UI
src/tools/calc.ts src/tools/BashTool/ 30 vs ~2000 eval 代替 bash 子进程(仅 demo,真实代码绝对不用 eval)

核心设计 4 件套

1️⃣ AsyncGenerator 作为统一事件流

async *run(messages: Message[]): AsyncGenerator<EngineEvent, void, void> {
  for await (const event of this.streamWithRetry(messages, toolDefs)) {
    yield event;  // ← 每个事件 yield 出去
  }
}

优势: - 调用方 for await 一行搞定,不用写 callback / EventEmitter - 天然支持 backpressure(慢消费方不会压垮 producer) - 可以用 return() 取消(abort)

真实代码:claude.ts 用 ReadableStream + 自定义 parser,效果一样但代码多 3 倍。

2️⃣ Discriminated Union 作为事件类型

type EngineEvent =
  | { type: "text"; text: string }
  | { type: "tool_use"; id: string; name: string; input: unknown }
  | { type: "tool_result"; tool_use_id: string; content: string; is_error: boolean }
  | { type: "error"; error: Error; retryable: boolean }
  | { type: "done"; usage: Usage };

优势: - TypeScript 自动收窄(if (event.type === "text") event.text 必存在) - 消费者用 switch + case 处理,新增事件类型时 TS 报错提示"忘了处理" - 替代多 callback / EventEmitter 的"事件名拼错"陷阱

3️⃣ Tool Use 循环 = generator 内嵌 generator

// 外层 generator:管理 turn
for await (const event of this.streamWithRetry(messages, toolDefs)) {
  yield event;
  if (event.type === "tool_use") {
    const result = await tool.execute(event.input);
    yield { type: "tool_result", ... };
    // 回到 for 顶端,自动发起下一轮
  }
}

真实代码messages 数组累积 tool_result,相同效果。

4️⃣ 退避重试 = while + sleep

while (true) {
  try {
    yield* this.opts.api.stream(req);
    return;
  } catch (err) {
    if (err.status === 429 || err.status >= 500) {
      await sleep(1000 * 2 ** attempt, signal);  // 1s → 2s → 4s → 8s
      continue;
    }
    throw err;
  }
}

真实代码src/utils/backoff.ts:30 retryWithBackoff() 是一样的逻辑,加 jitter 防雷鸣。

Mock API 路由表

prompt 前缀 行为 演示场景
echo: <text> 调 echo 工具 基础 tool use 流程
calc: <expr> 调 calc 工具 + eval 工具参数解析 + 错误传播
fail: <any> 抛 5xx 错误 重试 + 指数退避
abort: <any> 抛 AbortError 取消正在进行的 stream
其他 直接 text 回复 纯文本对话

进阶练习

  1. 加 SSE transport:把 MockApiClient 换成 fetch + ReadableStream(SSE 格式)
  2. 加 prompt cache:在 engine 里加 cache_control 标记,模拟 CC 的 cache strategies
  3. 加 max_tokens 截断:yield "text" 时累计 token,超 max 就 stop
  4. 加 tool 并行:当前 tool 串行执行,改成 Promise.all
  5. 加 usage 限流:在 mock-api 里随机抛 429,看 engine 怎么处理

相关阅读