跳转至

Mini Async Stream

抽出 mini-query-engine 和 mini-attachment-resolver 共用的"流"模式。 3 核心:AsyncGenerator / AbortController / 背压。 4 测试场景:基本消费 / 慢 consumer 背压 / 提前 abort / 多 consumer。

文件

mini-async-stream/
├── src/
│   ├── stream.ts  countUp + slowConsumer + CancellableStream(~80 行)
│   └── cli.ts     4 测试场景
└── README.md

npm install && npm start

真实代码对照

Demo 真实代码 简化
countUp mini-query-engine 流的简化 yield i + sleep
AbortSignal 取消 mini-attachment-resolver 1s 超时 同样模式
背压 for await 自动

核心 4 件套

1️⃣ AsyncGenerator = 单/多值流

async function* countUp(max, intervalMs) {
  for (let i = 1; i <= max; i++) {
    await sleep(intervalMs);
    yield i;
  }
}
- function* 前加 async = 返回 Promise + Iterator - 调 next(){ value, done } - for await...of 自动展平

2️⃣ 背压 = for await 自动

for await (const n of countUp(5, 50)) {  // producer 50ms
  await sleep(200);                       // consumer 200ms
}
producer 必须等 consumer总耗时 = max(producer, consumer) × 数量,不是 sum。

3️⃣ AbortSignal = 取消

async function* gen(signal) {
  while (true) {
    if (signal.aborted) return;  // 优雅退出
    yield ...;
  }
}

const ac = new AbortController();
for await (const v of gen(ac.signal)) {
  if (...) ac.abort();  // 中途取消
}
- return 不是 throw,finally 还会跑(清理资源) - AbortController 跨 async/await 传,比 callback 简单

4️⃣ for await 自动清理

for await...of 循环 break 时,自动调 generator 的 return(),触发 finally。

async function* gen() {
  try {
    yield 1;
  } finally {
    console.log("cleaned up");  // 自动跑
  }
}

for await (const v of gen()) {
  break;  // 触发 finally
}

4 场景输出

📌 Test 1: basic (1 2 3 4 5, 50ms)
📌 Test 2: slow consumer 200ms
  total: 1270ms (5 × (200+50) = 1250)
📌 Test 3: abort after 3
  consumed 3 items in 157ms (then abort)
📌 Test 4: 2 consumers
  consumer A: 1 2 3 4 5
  consumer B: 1 2 3 4 5

进阶练习

  1. 加 map/filter/reduce:用 generator helper 函数
  2. 加 throttle:每 N ms 最多 yield 1 个
  3. 加 debounce:等 N ms 静止才 yield
  4. 加 take(n) / skip(n):截断
  5. 加 tee:1 个 source 拆 2 个 consumer(不能复用 generator,每个消费方要 new source())
  6. 加 timeout per item:单个 item 超过 N ms 自动跳

相关阅读