Node.js ストリーム処理実践:Readable、Writable、Transform とバックプレッシャー機構の完全解説

技术架构(更新: 2026年5月10日)

なぜストリームを使うのか?メモリが GB から KB に

方式 1GB ファイル処理 メモリ使用量 所要時間
fs.readFile 全量をメモリに読み込み 1GB+ 長く、ブロッキング
fs.createReadStream チャンク単位で処理 ~64KB 即座に開始

核心思想:全データの準備を待たず、読みながら処理する——データがパイプを水のように流れます。


1. 4 種類のストリーム

種類 方向 典型的な用途
Readable 入力 ファイル読み取り、HTTP リクエストボディ
Writable 出力 ファイル書き込み、HTTP レスポンス
Duplex 双方向 TCP ソケット、WebSocket
Transform 変換 圧縮、暗号化、フォーマット変換

継承関係

Stream
├── Readable
├── Writable
├── Duplex (Readable + Writable)
└── Transform (Duplex、出力が入力に基づく)

2. Readable ストリーム

2 つのモード

モード トリガー 特徴
Paused(一時停止) デフォルト 手動で read() を呼び出す必要あり
Flowing(フロー) data イベントをバインド データを自動プッシュ

カスタム Readable の作成

import { Readable } from 'stream';

class NumberStream extends Readable {
  constructor(max) {
    super({ objectMode: true });
    this.max = max;
    this.current = 0;
  }

  _read() {
    if (this.current < this.max) {
      this.push({ value: this.current, timestamp: Date.now() });
      this.current++;
    } else {
      this.push(null); // ストリーム終了
    }
  }
}

const stream = new NumberStream(5);
stream.on('data', (chunk) => console.log(chunk));
// { value: 0, timestamp: ... }
// { value: 1, timestamp: ... }
// ...

イテレータから作成

import { Readable } from 'stream';

async function* generateLogs() {
  for (let i = 0; i < 100; i++) {
    await sleep(100);
    yield `[${new Date().toISOString()}] Event ${i}\n`;
  }
}

const logStream = Readable.from(generateLogs());
logStream.pipe(process.stdout);

3. Writable ストリーム

カスタム Writable の作成

import { Writable } from 'stream';

class BatchWriter extends Writable {
  constructor(options) {
    super({ objectMode: true, highWaterMark: 10 });
    this.batch = [];
    this.batchSize = options.batchSize || 5;
  }

  _write(chunk, encoding, callback) {
    this.batch.push(chunk);

    if (this.batch.length >= this.batchSize) {
      this.flush()
        .then(() => callback())
        .catch(callback);
    } else {
      callback();
    }
  }

  _final(callback) {
    if (this.batch.length > 0) {
      this.flush()
        .then(() => callback())
        .catch(callback);
    } else {
      callback();
    }
  }

  async flush() {
    console.log('バッチ書き込み:', this.batch);
    await db.batchInsert(this.batch);
    this.batch = [];
  }
}

write() の戻り値とバックプレッシャー

const writable = fs.createWriteStream('output.txt');

for (let i = 0; i < 1e6; i++) {
  const canContinue = writable.write(`Line ${i}\n`);
  if (!canContinue) {
    // バッファがいっぱい、drain イベントを待つ
    await once(writable, 'drain');
  }
}
writable.end();

4. Transform ストリーム

基本 Transform

import { Transform } from 'stream';

class JsonLineParser extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(chunk, encoding, callback) {
    try {
      const data = JSON.parse(chunk.toString());
      this.push(data);
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

実践:CSV から JSON へのストリーム変換

import { Transform } from 'stream';

class CsvToJson extends Transform {
  constructor() {
    super({ writableObjectMode: false, readableObjectMode: true });
    this.headers = null;
    this.partialLine = '';
  }

  _transform(chunk, encoding, callback) {
    const lines = (this.partialLine + chunk.toString()).split('\n');
    this.partialLine = lines.pop(); // 不完全な行を保持

    for (const line of lines) {
      if (!this.headers) {
        this.headers = line.split(',');
        continue;
      }

      const values = line.split(',');
      const obj = {};
      this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
      this.push(obj);
    }

    callback();
  }

  _flush(callback) {
    if (this.partialLine) {
      const values = this.partialLine.split(',');
      const obj = {};
      this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
      this.push(obj);
    }
    callback();
  }
}

pipeline を使用した組み合わせ

import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { CsvToJson } from './csv-to-json.js';
import { Transform } from 'stream';

await pipeline(
  createReadStream('data.csv'),
  new CsvToJson(),
  new Transform({
    objectMode: true,
    transform(obj, _, cb) {
      this.push(JSON.stringify(obj) + '\n');
      cb();
    },
  }),
  createWriteStream('data.jsonl')
);

pipeline の利点:エラー伝播とストリームクリーンアップを自動処理、.pipe() より安全。


5. バックプレッシャー機構

バックプレッシャーとは?

生産者の速度 > 消費者の速度の場合、データがバッファに蓄積します。バックプレッシャーは消費者が生産者に減速を通知する機構です。

Readable (速い) → Writable (遅い)
  100MB/s 生成        10MB/s しか処理できない
  
  バックプレッシャーなし:バッファオーバーフロー → メモリ不足
  バックプレッシャーあり:Readable 一時停止 → Writable の処理を待つ

バックプレッシャーのワークフロー

readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);
  if (!canContinue) {
    readable.pause(); // 読み取り一時停止
    writable.once('drain', () => {
      readable.resume(); // 読み取り再開
    });
  }
});

highWaterMark 設定

ストリーム種類 デフォルト 意味
Readable 64KB (16KB objectMode) 内部バッファサイズ
Writable 16KB 書き込みバッファサイズ
const readable = fs.createReadStream('big.txt', {
  highWaterMark: 1024 * 1024, // 1MB バッファ
});

6. 実践例

例 1:大容量ファイル Gzip 圧縮

import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';

await pipeline(
  createReadStream('large.log'),
  createGzip({ level: 9 }),
  createWriteStream('large.log.gz')
);

例 2:HTTP ストリーミングレスポンス

import { createReadStream } from 'fs';
import { stat } from 'fs/promises';

app.get('/video/:id', async (req, res) => {
  const filePath = `/videos/${req.params.id}.mp4`;
  const { size } = await stat(filePath);
  const range = req.headers.range;

  if (range) {
    const [start, end] = range.replace(/bytes=/, '').split('-').map(Number);
    res.writeHead(206, {
      'Content-Range': `bytes ${start}-${end || size - 1}/${size}`,
      'Content-Length': (end || size - 1) - start + 1,
      'Content-Type': 'video/mp4',
    });
    createReadStream(filePath, { start, end }).pipe(res);
  } else {
    res.writeHead(200, {
      'Content-Length': size,
      'Content-Type': 'video/mp4',
    });
    createReadStream(filePath).pipe(res);
  }
});

例 3:ログの行単位処理

import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { Transform } from 'stream';
import { createInterface } from 'readline';

const fileStream = createReadStream('app.log');
const rl = createInterface({ input: fileStream });

let errorCount = 0;
for await (const line of rl) {
  if (line.includes('ERROR')) {
    errorCount++;
    if (errorCount <= 10) console.log(line);
  }
}
console.log(`総エラー数: ${errorCount}`);

例 4:Web Stream と Node Stream の相互変換

import { Readable } from 'stream';

// Node Readable → Web ReadableStream
const nodeStream = fs.createReadStream('data.bin');
const webStream = Readable.toWeb(nodeStream);

// ブラウザ側で使用
const reader = webStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  processChunk(value);
}

7. よくある落とし穴

落とし穴 結果 解決策
error イベントの処理忘れ プロセスが静かにクラッシュ .pipe() の代わりに pipeline を使用
.pipe() がバックプレッシャーを処理しない メモリ不足 pipeline を使用するか手動処理
Transform で callback を呼び忘れ ストリームがハング _transform で必ず callback を呼ぶ
objectMode の混在 データ消失/型エラー 上流と下流の objectMode を一致させる
_final を実装しない 最後のバッチデータが失われる _final を実装して残存データを処理
#Node.js#Stream#背压#性能优化