stream
The stream module is one of Node.js’s most powerful built-in modules. It provides APIs for handling streaming data efficiently, processing large amounts of data without loading everything into memory.
Overview
Streams are a fundamental concept in Node.js that allow you to read data piece by piece or write data incrementally. They are memory-efficient because they don’t require loading entire datasets into memory.
Types of Streams
Readable Streams
Readable streams produce data that can be consumed. Examples include:
fs.createReadStream()- reading filesprocess.stdin- standard input- HTTP responses on the client side
const fs = require('fs');
const readable = fs.createReadStream('file.txt', {
encoding: 'utf8',
highWaterMark: 1024 // chunk size in bytes
});
readable.on('data', (chunk) => {
console.log('Received chunk:', chunk.length, 'bytes');
});
readable.on('end', () => {
console.log('No more data');
});
readable.on('error', (err) => {
console.error('Error:', err);
});
Writable Streams
Writable streams receive and process data. Examples include:
fs.createWriteStream()- writing filesprocess.stdout- standard output- HTTP requests on the client side
const fs = require('fs');
const writable = fs.createWriteStream('output.txt');
writable.write('Hello, ');
writable.write('World!\n');
writable.end('Done writing');
writable.on('finish', () => {
console.log('All data written');
});
writable.on('error', (err) => {
console.error('Error:', err);
});
Transform Streams
Transform streams modify data as it passes through. They are both readable and writable.
const { Transform } = require('stream');
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
upperCase.on('data', (chunk) => {
console.log(chunk.toString()); // HELLO WORLD
});
upperCase.write('hello ');
upperCase.write('world');
upperCase.end();
Duplex Streams
Duplex streams implement both readable and writable interfaces independently. A good example is a TCP socket.
const { Duplex } = require('stream');
const duplex = new Duplex({
read(size) {
// Push data to be read
this.push('Hello from duplex ');
this.push(null); // Signal end of stream
},
write(chunk, encoding, callback) {
// Process written data
console.log('Received:', chunk.toString());
callback();
}
});
duplex.on('data', (chunk) => {
console.log('Read:', chunk.toString());
});
duplex.write('Writing to duplex ');
duplex.end();
Pipeline
The stream.pipeline() function pipes between streams asynchronously, handling errors and cleanup automatically.
const fs = require('fs');
const { pipeline } = require('stream/promises');
async function copyFile(source, destination) {
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(destination);
await pipeline(readStream, writeStream);
console.log('Pipeline succeeded');
}
copyFile('input.txt', 'output.txt').catch((err) => {
console.error('Pipeline failed:', err);
});
Finished
The stream.finished() function waits for the stream to finish.
const fs = require('fs');
const { finished } = require('stream/promises');
const readStream = fs.createReadStream('file.txt');
await finished(readStream);
console.log('Stream finished');
Backpressure
When writing to a stream, you must handle backpressure. If the internal buffer is full, pause writing until it drains.
const fs = require('fs');
const writable = fs.createWriteStream('output.txt');
function writeData(data) {
const canContinue = writable.write(data);
if (!canContinue) {
writable.once('drain', () => {
writeData(moreData);
});
}
}
Object Mode
Streams can work with JavaScript objects instead of strings or buffers by enabling object mode.
const { Readable } = require('stream');
const objectStream = new Readable({
objectMode: true,
read() {
this.push({ name: 'Alice', age: 30 });
this.push({ name: 'Bob', age: 25 });
this.push(null); // End stream
}
});
objectStream.on('data', (obj) => {
console.log(obj); // { name: 'Alice', age: 30 }
});