Node.js Stream Processing in Practice: A Complete Guide to Readable, Writable, Transform, and Backpressure

技术架构(Updated May 10, 2026)

Why Use Streams? Memory Drops from GB to KB

Method Processing 1GB File Memory Usage Time
fs.readFile Load all into memory 1GB+ Long and blocking
fs.createReadStream Process in chunks ~64KB Starts immediately

Core concept: Don't wait for all data to be ready—process as you read. Data flows through pipes like water.


1. Four Stream Types

Type Direction Typical Use
Readable Input File reading, HTTP request body
Writable Output File writing, HTTP response
Duplex Bidirectional TCP Socket, WebSocket
Transform Transform Compression, encryption, format conversion

Inheritance

Stream
├── Readable
├── Writable
├── Duplex (Readable + Writable)
└── Transform (Duplex, output based on input)

2. Readable Streams

Two Modes

Mode Trigger Characteristics
Paused Default Must manually call read()
Flowing Bind data event Auto-pushes data

Creating a Custom Readable

import { Readable } from 'stream';

class NumberStream extends Readable {
  constructor(max) {
    super({ objectMode: true });
    this.max = max;
    this.current = 0;
  }

  _read() {
    if (this.current < this.max) {
      this.push({ value: this.current, timestamp: Date.now() });
      this.current++;
    } else {
      this.push(null); // End stream
    }
  }
}

const stream = new NumberStream(5);
stream.on('data', (chunk) => console.log(chunk));
// { value: 0, timestamp: ... }
// { value: 1, timestamp: ... }
// ...

Creating from an Iterator

import { Readable } from 'stream';

async function* generateLogs() {
  for (let i = 0; i < 100; i++) {
    await sleep(100);
    yield `[${new Date().toISOString()}] Event ${i}\n`;
  }
}

const logStream = Readable.from(generateLogs());
logStream.pipe(process.stdout);

3. Writable Streams

Creating a Custom Writable

import { Writable } from 'stream';

class BatchWriter extends Writable {
  constructor(options) {
    super({ objectMode: true, highWaterMark: 10 });
    this.batch = [];
    this.batchSize = options.batchSize || 5;
  }

  _write(chunk, encoding, callback) {
    this.batch.push(chunk);

    if (this.batch.length >= this.batchSize) {
      this.flush()
        .then(() => callback())
        .catch(callback);
    } else {
      callback();
    }
  }

  _final(callback) {
    if (this.batch.length > 0) {
      this.flush()
        .then(() => callback())
        .catch(callback);
    } else {
      callback();
    }
  }

  async flush() {
    console.log('Writing batch:', this.batch);
    await db.batchInsert(this.batch);
    this.batch = [];
  }
}

write() Return Value and Backpressure

const writable = fs.createWriteStream('output.txt');

for (let i = 0; i < 1e6; i++) {
  const canContinue = writable.write(`Line ${i}\n`);
  if (!canContinue) {
    // Buffer full, wait for drain event
    await once(writable, 'drain');
  }
}
writable.end();

4. Transform Streams

Basic Transform

import { Transform } from 'stream';

class JsonLineParser extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(chunk, encoding, callback) {
    try {
      const data = JSON.parse(chunk.toString());
      this.push(data);
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

Practical: CSV to JSON Stream

import { Transform } from 'stream';

class CsvToJson extends Transform {
  constructor() {
    super({ writableObjectMode: false, readableObjectMode: true });
    this.headers = null;
    this.partialLine = '';
  }

  _transform(chunk, encoding, callback) {
    const lines = (this.partialLine + chunk.toString()).split('\n');
    this.partialLine = lines.pop(); // Keep incomplete line

    for (const line of lines) {
      if (!this.headers) {
        this.headers = line.split(',');
        continue;
      }

      const values = line.split(',');
      const obj = {};
      this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
      this.push(obj);
    }

    callback();
  }

  _flush(callback) {
    if (this.partialLine) {
      const values = this.partialLine.split(',');
      const obj = {};
      this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
      this.push(obj);
    }
    callback();
  }
}

Combining with pipeline

import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { CsvToJson } from './csv-to-json.js';
import { Transform } from 'stream';

await pipeline(
  createReadStream('data.csv'),
  new CsvToJson(),
  new Transform({
    objectMode: true,
    transform(obj, _, cb) {
      this.push(JSON.stringify(obj) + '\n');
      cb();
    },
  }),
  createWriteStream('data.jsonl')
);

pipeline advantage: Automatically handles error propagation and stream cleanup, safer than .pipe().


5. Backpressure Mechanism

What Is Backpressure?

When producer speed > consumer speed, data accumulates in the buffer. Backpressure is the mechanism where the consumer tells the producer to slow down.

Readable (fast) → Writable (slow)
  Produces 100MB/s    Can only process 10MB/s
  
  Without backpressure: buffer overflows → OOM
  With backpressure: Readable pauses → waits for Writable to catch up

Backpressure Workflow

readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);
  if (!canContinue) {
    readable.pause(); // Pause reading
    writable.once('drain', () => {
      readable.resume(); // Resume reading
    });
  }
});

highWaterMark Configuration

Stream Type Default Meaning
Readable 64KB (16KB objectMode) Internal buffer size
Writable 16KB Write buffer size
const readable = fs.createReadStream('big.txt', {
  highWaterMark: 1024 * 1024, // 1MB buffer
});

6. Practical Examples

Example 1: Large File Gzip Compression

import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';

await pipeline(
  createReadStream('large.log'),
  createGzip({ level: 9 }),
  createWriteStream('large.log.gz')
);

Example 2: HTTP Streaming Response

import { createReadStream } from 'fs';
import { stat } from 'fs/promises';

app.get('/video/:id', async (req, res) => {
  const filePath = `/videos/${req.params.id}.mp4`;
  const { size } = await stat(filePath);
  const range = req.headers.range;

  if (range) {
    const [start, end] = range.replace(/bytes=/, '').split('-').map(Number);
    res.writeHead(206, {
      'Content-Range': `bytes ${start}-${end || size - 1}/${size}`,
      'Content-Length': (end || size - 1) - start + 1,
      'Content-Type': 'video/mp4',
    });
    createReadStream(filePath, { start, end }).pipe(res);
  } else {
    res.writeHead(200, {
      'Content-Length': size,
      'Content-Type': 'video/mp4',
    });
    createReadStream(filePath).pipe(res);
  }
});

Example 3: Line-by-Line Log Processing

import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { Transform } from 'stream';
import { createInterface } from 'readline';

const fileStream = createReadStream('app.log');
const rl = createInterface({ input: fileStream });

let errorCount = 0;
for await (const line of rl) {
  if (line.includes('ERROR')) {
    errorCount++;
    if (errorCount <= 10) console.log(line);
  }
}
console.log(`Total errors: ${errorCount}`);

Example 4: Web Stream to Node Stream Conversion

import { Readable } from 'stream';

// Node Readable → Web ReadableStream
const nodeStream = fs.createReadStream('data.bin');
const webStream = Readable.toWeb(nodeStream);

// Use in browser
const reader = webStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  processChunk(value);
}

7. Common Pitfalls

Pitfall Consequence Solution
Forgetting to handle error events Process silently crashes Use pipeline instead of .pipe()
.pipe() doesn't handle backpressure OOM Use pipeline or handle manually
Not calling callback in Transform Stream hangs Ensure _transform calls callback
Mixing objectMode Data loss/type errors Ensure upstream and downstream objectMode match
Not implementing _final Last batch of data lost Implement _final to handle remaining data
#Node.js#Stream#背压#性能优化