Node.js ストリーム処理実践:Readable、Writable、Transform とバックプレッシャー機構の完全解説
技术架构(更新: 2026年5月10日)
なぜストリームを使うのか?メモリが GB から KB に
| 方式 | 1GB ファイル処理 | メモリ使用量 | 所要時間 |
|---|---|---|---|
fs.readFile |
全量をメモリに読み込み | 1GB+ | 長く、ブロッキング |
fs.createReadStream |
チャンク単位で処理 | ~64KB | 即座に開始 |
核心思想:全データの準備を待たず、読みながら処理する——データがパイプを水のように流れます。
1. 4 種類のストリーム
| 種類 | 方向 | 典型的な用途 |
|---|---|---|
| Readable | 入力 | ファイル読み取り、HTTP リクエストボディ |
| Writable | 出力 | ファイル書き込み、HTTP レスポンス |
| Duplex | 双方向 | TCP ソケット、WebSocket |
| Transform | 変換 | 圧縮、暗号化、フォーマット変換 |
継承関係
Stream
├── Readable
├── Writable
├── Duplex (Readable + Writable)
└── Transform (Duplex、出力が入力に基づく)
2. Readable ストリーム
2 つのモード
| モード | トリガー | 特徴 |
|---|---|---|
| Paused(一時停止) | デフォルト | 手動で read() を呼び出す必要あり |
| Flowing(フロー) | data イベントをバインド |
データを自動プッシュ |
カスタム Readable の作成
import { Readable } from 'stream';
class NumberStream extends Readable {
constructor(max) {
super({ objectMode: true });
this.max = max;
this.current = 0;
}
_read() {
if (this.current < this.max) {
this.push({ value: this.current, timestamp: Date.now() });
this.current++;
} else {
this.push(null); // ストリーム終了
}
}
}
const stream = new NumberStream(5);
stream.on('data', (chunk) => console.log(chunk));
// { value: 0, timestamp: ... }
// { value: 1, timestamp: ... }
// ...
イテレータから作成
import { Readable } from 'stream';
async function* generateLogs() {
for (let i = 0; i < 100; i++) {
await sleep(100);
yield `[${new Date().toISOString()}] Event ${i}\n`;
}
}
const logStream = Readable.from(generateLogs());
logStream.pipe(process.stdout);
3. Writable ストリーム
カスタム Writable の作成
import { Writable } from 'stream';
class BatchWriter extends Writable {
constructor(options) {
super({ objectMode: true, highWaterMark: 10 });
this.batch = [];
this.batchSize = options.batchSize || 5;
}
_write(chunk, encoding, callback) {
this.batch.push(chunk);
if (this.batch.length >= this.batchSize) {
this.flush()
.then(() => callback())
.catch(callback);
} else {
callback();
}
}
_final(callback) {
if (this.batch.length > 0) {
this.flush()
.then(() => callback())
.catch(callback);
} else {
callback();
}
}
async flush() {
console.log('バッチ書き込み:', this.batch);
await db.batchInsert(this.batch);
this.batch = [];
}
}
write() の戻り値とバックプレッシャー
const writable = fs.createWriteStream('output.txt');
for (let i = 0; i < 1e6; i++) {
const canContinue = writable.write(`Line ${i}\n`);
if (!canContinue) {
// バッファがいっぱい、drain イベントを待つ
await once(writable, 'drain');
}
}
writable.end();
4. Transform ストリーム
基本 Transform
import { Transform } from 'stream';
class JsonLineParser extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
try {
const data = JSON.parse(chunk.toString());
this.push(data);
callback();
} catch (err) {
callback(err);
}
}
}
実践:CSV から JSON へのストリーム変換
import { Transform } from 'stream';
class CsvToJson extends Transform {
constructor() {
super({ writableObjectMode: false, readableObjectMode: true });
this.headers = null;
this.partialLine = '';
}
_transform(chunk, encoding, callback) {
const lines = (this.partialLine + chunk.toString()).split('\n');
this.partialLine = lines.pop(); // 不完全な行を保持
for (const line of lines) {
if (!this.headers) {
this.headers = line.split(',');
continue;
}
const values = line.split(',');
const obj = {};
this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
this.push(obj);
}
callback();
}
_flush(callback) {
if (this.partialLine) {
const values = this.partialLine.split(',');
const obj = {};
this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
this.push(obj);
}
callback();
}
}
pipeline を使用した組み合わせ
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { CsvToJson } from './csv-to-json.js';
import { Transform } from 'stream';
await pipeline(
createReadStream('data.csv'),
new CsvToJson(),
new Transform({
objectMode: true,
transform(obj, _, cb) {
this.push(JSON.stringify(obj) + '\n');
cb();
},
}),
createWriteStream('data.jsonl')
);
pipeline の利点:エラー伝播とストリームクリーンアップを自動処理、.pipe() より安全。
5. バックプレッシャー機構
バックプレッシャーとは?
生産者の速度 > 消費者の速度の場合、データがバッファに蓄積します。バックプレッシャーは消費者が生産者に減速を通知する機構です。
Readable (速い) → Writable (遅い)
100MB/s 生成 10MB/s しか処理できない
バックプレッシャーなし:バッファオーバーフロー → メモリ不足
バックプレッシャーあり:Readable 一時停止 → Writable の処理を待つ
バックプレッシャーのワークフロー
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
readable.pause(); // 読み取り一時停止
writable.once('drain', () => {
readable.resume(); // 読み取り再開
});
}
});
highWaterMark 設定
| ストリーム種類 | デフォルト | 意味 |
|---|---|---|
| Readable | 64KB (16KB objectMode) | 内部バッファサイズ |
| Writable | 16KB | 書き込みバッファサイズ |
const readable = fs.createReadStream('big.txt', {
highWaterMark: 1024 * 1024, // 1MB バッファ
});
6. 実践例
例 1:大容量ファイル Gzip 圧縮
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
await pipeline(
createReadStream('large.log'),
createGzip({ level: 9 }),
createWriteStream('large.log.gz')
);
例 2:HTTP ストリーミングレスポンス
import { createReadStream } from 'fs';
import { stat } from 'fs/promises';
app.get('/video/:id', async (req, res) => {
const filePath = `/videos/${req.params.id}.mp4`;
const { size } = await stat(filePath);
const range = req.headers.range;
if (range) {
const [start, end] = range.replace(/bytes=/, '').split('-').map(Number);
res.writeHead(206, {
'Content-Range': `bytes ${start}-${end || size - 1}/${size}`,
'Content-Length': (end || size - 1) - start + 1,
'Content-Type': 'video/mp4',
});
createReadStream(filePath, { start, end }).pipe(res);
} else {
res.writeHead(200, {
'Content-Length': size,
'Content-Type': 'video/mp4',
});
createReadStream(filePath).pipe(res);
}
});
例 3:ログの行単位処理
import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { Transform } from 'stream';
import { createInterface } from 'readline';
const fileStream = createReadStream('app.log');
const rl = createInterface({ input: fileStream });
let errorCount = 0;
for await (const line of rl) {
if (line.includes('ERROR')) {
errorCount++;
if (errorCount <= 10) console.log(line);
}
}
console.log(`総エラー数: ${errorCount}`);
例 4:Web Stream と Node Stream の相互変換
import { Readable } from 'stream';
// Node Readable → Web ReadableStream
const nodeStream = fs.createReadStream('data.bin');
const webStream = Readable.toWeb(nodeStream);
// ブラウザ側で使用
const reader = webStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
processChunk(value);
}
7. よくある落とし穴
| 落とし穴 | 結果 | 解決策 |
|---|---|---|
| error イベントの処理忘れ | プロセスが静かにクラッシュ | .pipe() の代わりに pipeline を使用 |
.pipe() がバックプレッシャーを処理しない |
メモリ不足 | pipeline を使用するか手動処理 |
| Transform で callback を呼び忘れ | ストリームがハング | _transform で必ず callback を呼ぶ |
| objectMode の混在 | データ消失/型エラー | 上流と下流の objectMode を一致させる |
_final を実装しない |
最後のバッチデータが失われる | _final を実装して残存データを処理 |
#Node.js#Stream#背压#性能优化