Mini Async Stream¶
抽出 mini-query-engine 和 mini-attachment-resolver 共用的"流"模式。 3 核心:AsyncGenerator / AbortController / 背压。 4 测试场景:基本消费 / 慢 consumer 背压 / 提前 abort / 多 consumer。
文件¶
mini-async-stream/
├── src/
│ ├── stream.ts countUp + slowConsumer + CancellableStream(~80 行)
│ └── cli.ts 4 测试场景
└── README.md
跑¶
真实代码对照¶
| Demo | 真实代码 | 简化 |
|---|---|---|
countUp |
mini-query-engine 流的简化 | yield i + sleep |
AbortSignal 取消 |
mini-attachment-resolver 1s 超时 | 同样模式 |
| 背压 | for await 自动 | 同 |
核心 4 件套¶
1️⃣ AsyncGenerator = 单/多值流¶
async function* countUp(max, intervalMs) {
for (let i = 1; i <= max; i++) {
await sleep(intervalMs);
yield i;
}
}
function* 前加 async = 返回 Promise + Iterator
- 调 next() 返 { value, done }
- for await...of 自动展平
2️⃣ 背压 = for await 自动¶
producer 必须等 consumer。总耗时 = max(producer, consumer) × 数量,不是 sum。3️⃣ AbortSignal = 取消¶
async function* gen(signal) {
while (true) {
if (signal.aborted) return; // 优雅退出
yield ...;
}
}
const ac = new AbortController();
for await (const v of gen(ac.signal)) {
if (...) ac.abort(); // 中途取消
}
return 不是 throw,finally 还会跑(清理资源)
- AbortController 跨 async/await 传,比 callback 简单
4️⃣ for await 自动清理¶
for await...of 循环 break 时,自动调 generator 的 return(),触发 finally。
async function* gen() {
try {
yield 1;
} finally {
console.log("cleaned up"); // 自动跑
}
}
for await (const v of gen()) {
break; // 触发 finally
}
4 场景输出¶
📌 Test 1: basic (1 2 3 4 5, 50ms)
📌 Test 2: slow consumer 200ms
total: 1270ms (5 × (200+50) = 1250)
📌 Test 3: abort after 3
consumed 3 items in 157ms (then abort)
📌 Test 4: 2 consumers
consumer A: 1 2 3 4 5
consumer B: 1 2 3 4 5
进阶练习¶
- 加 map/filter/reduce:用 generator helper 函数
- 加 throttle:每 N ms 最多 yield 1 个
- 加 debounce:等 N ms 静止才 yield
- 加 take(n) / skip(n):截断
- 加 tee:1 个 source 拆 2 个 consumer(不能复用 generator,每个消费方要 new source())
- 加 timeout per item:单个 item 超过 N ms 自动跳
相关阅读¶
- topics/async-generator-pattern.md —— async function* 专题
- mini-query-engine/ —— 实际 AsyncGenerator 用法
- mini-attachment-resolver/ —— AbortSignal 用法
- mini-mcp-server/ —— NDJSON 流传输
- MDN AsyncGenerator