Node.js Stream Processing in Practice: A Complete Guide to Readable, Writable, Transform, and Backpressure
技术架构(Updated May 10, 2026)
Why Use Streams? Memory Drops from GB to KB
| Method | Processing 1GB File | Memory Usage | Time |
|---|---|---|---|
fs.readFile |
Load all into memory | 1GB+ | Long and blocking |
fs.createReadStream |
Process in chunks | ~64KB | Starts immediately |
Core concept: Don't wait for all data to be ready—process as you read. Data flows through pipes like water.
1. Four Stream Types
| Type | Direction | Typical Use |
|---|---|---|
| Readable | Input | File reading, HTTP request body |
| Writable | Output | File writing, HTTP response |
| Duplex | Bidirectional | TCP Socket, WebSocket |
| Transform | Transform | Compression, encryption, format conversion |
Inheritance
Stream
├── Readable
├── Writable
├── Duplex (Readable + Writable)
└── Transform (Duplex, output based on input)
2. Readable Streams
Two Modes
| Mode | Trigger | Characteristics |
|---|---|---|
| Paused | Default | Must manually call read() |
| Flowing | Bind data event |
Auto-pushes data |
Creating a Custom Readable
import { Readable } from 'stream';
class NumberStream extends Readable {
constructor(max) {
super({ objectMode: true });
this.max = max;
this.current = 0;
}
_read() {
if (this.current < this.max) {
this.push({ value: this.current, timestamp: Date.now() });
this.current++;
} else {
this.push(null); // End stream
}
}
}
const stream = new NumberStream(5);
stream.on('data', (chunk) => console.log(chunk));
// { value: 0, timestamp: ... }
// { value: 1, timestamp: ... }
// ...
Creating from an Iterator
import { Readable } from 'stream';
async function* generateLogs() {
for (let i = 0; i < 100; i++) {
await sleep(100);
yield `[${new Date().toISOString()}] Event ${i}\n`;
}
}
const logStream = Readable.from(generateLogs());
logStream.pipe(process.stdout);
3. Writable Streams
Creating a Custom Writable
import { Writable } from 'stream';
class BatchWriter extends Writable {
constructor(options) {
super({ objectMode: true, highWaterMark: 10 });
this.batch = [];
this.batchSize = options.batchSize || 5;
}
_write(chunk, encoding, callback) {
this.batch.push(chunk);
if (this.batch.length >= this.batchSize) {
this.flush()
.then(() => callback())
.catch(callback);
} else {
callback();
}
}
_final(callback) {
if (this.batch.length > 0) {
this.flush()
.then(() => callback())
.catch(callback);
} else {
callback();
}
}
async flush() {
console.log('Writing batch:', this.batch);
await db.batchInsert(this.batch);
this.batch = [];
}
}
write() Return Value and Backpressure
const writable = fs.createWriteStream('output.txt');
for (let i = 0; i < 1e6; i++) {
const canContinue = writable.write(`Line ${i}\n`);
if (!canContinue) {
// Buffer full, wait for drain event
await once(writable, 'drain');
}
}
writable.end();
4. Transform Streams
Basic Transform
import { Transform } from 'stream';
class JsonLineParser extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
try {
const data = JSON.parse(chunk.toString());
this.push(data);
callback();
} catch (err) {
callback(err);
}
}
}
Practical: CSV to JSON Stream
import { Transform } from 'stream';
class CsvToJson extends Transform {
constructor() {
super({ writableObjectMode: false, readableObjectMode: true });
this.headers = null;
this.partialLine = '';
}
_transform(chunk, encoding, callback) {
const lines = (this.partialLine + chunk.toString()).split('\n');
this.partialLine = lines.pop(); // Keep incomplete line
for (const line of lines) {
if (!this.headers) {
this.headers = line.split(',');
continue;
}
const values = line.split(',');
const obj = {};
this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
this.push(obj);
}
callback();
}
_flush(callback) {
if (this.partialLine) {
const values = this.partialLine.split(',');
const obj = {};
this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
this.push(obj);
}
callback();
}
}
Combining with pipeline
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { CsvToJson } from './csv-to-json.js';
import { Transform } from 'stream';
await pipeline(
createReadStream('data.csv'),
new CsvToJson(),
new Transform({
objectMode: true,
transform(obj, _, cb) {
this.push(JSON.stringify(obj) + '\n');
cb();
},
}),
createWriteStream('data.jsonl')
);
pipeline advantage: Automatically handles error propagation and stream cleanup, safer than .pipe().
5. Backpressure Mechanism
What Is Backpressure?
When producer speed > consumer speed, data accumulates in the buffer. Backpressure is the mechanism where the consumer tells the producer to slow down.
Readable (fast) → Writable (slow)
Produces 100MB/s Can only process 10MB/s
Without backpressure: buffer overflows → OOM
With backpressure: Readable pauses → waits for Writable to catch up
Backpressure Workflow
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
readable.pause(); // Pause reading
writable.once('drain', () => {
readable.resume(); // Resume reading
});
}
});
highWaterMark Configuration
| Stream Type | Default | Meaning |
|---|---|---|
| Readable | 64KB (16KB objectMode) | Internal buffer size |
| Writable | 16KB | Write buffer size |
const readable = fs.createReadStream('big.txt', {
highWaterMark: 1024 * 1024, // 1MB buffer
});
6. Practical Examples
Example 1: Large File Gzip Compression
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
await pipeline(
createReadStream('large.log'),
createGzip({ level: 9 }),
createWriteStream('large.log.gz')
);
Example 2: HTTP Streaming Response
import { createReadStream } from 'fs';
import { stat } from 'fs/promises';
app.get('/video/:id', async (req, res) => {
const filePath = `/videos/${req.params.id}.mp4`;
const { size } = await stat(filePath);
const range = req.headers.range;
if (range) {
const [start, end] = range.replace(/bytes=/, '').split('-').map(Number);
res.writeHead(206, {
'Content-Range': `bytes ${start}-${end || size - 1}/${size}`,
'Content-Length': (end || size - 1) - start + 1,
'Content-Type': 'video/mp4',
});
createReadStream(filePath, { start, end }).pipe(res);
} else {
res.writeHead(200, {
'Content-Length': size,
'Content-Type': 'video/mp4',
});
createReadStream(filePath).pipe(res);
}
});
Example 3: Line-by-Line Log Processing
import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { Transform } from 'stream';
import { createInterface } from 'readline';
const fileStream = createReadStream('app.log');
const rl = createInterface({ input: fileStream });
let errorCount = 0;
for await (const line of rl) {
if (line.includes('ERROR')) {
errorCount++;
if (errorCount <= 10) console.log(line);
}
}
console.log(`Total errors: ${errorCount}`);
Example 4: Web Stream to Node Stream Conversion
import { Readable } from 'stream';
// Node Readable → Web ReadableStream
const nodeStream = fs.createReadStream('data.bin');
const webStream = Readable.toWeb(nodeStream);
// Use in browser
const reader = webStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
processChunk(value);
}
7. Common Pitfalls
| Pitfall | Consequence | Solution |
|---|---|---|
| Forgetting to handle error events | Process silently crashes | Use pipeline instead of .pipe() |
.pipe() doesn't handle backpressure |
OOM | Use pipeline or handle manually |
| Not calling callback in Transform | Stream hangs | Ensure _transform calls callback |
| Mixing objectMode | Data loss/type errors | Ensure upstream and downstream objectMode match |
Not implementing _final |
Last batch of data lost | Implement _final to handle remaining data |
#Node.js#Stream#背压#性能优化