Introduction to Streams
This chapter covers streams in Node.js - what they are, why they matter, and how to use them. We’re not diving into every edge case here, just building a solid foundation. The chapters after this one will get into the weirdness.
All the code examples mentioned in this chapter can be found here.
So here’s the thing about Node.js - pretty much everything that moves data around is built on streams. Reading files? Streams. HTTP requests coming into your server? Streams. Even typing into your terminal and seeing output - all streams. Once you start looking for them, they’re everywhere. They’re the invisible infrastructure that makes Node actually work.
The whole point of streams is solving one very specific problem: memory. Say you need to work with a 10-gigabyte video file, but your server only has 8 gigabytes of RAM. The obvious approach - read the whole file into memory, process it, write it back out - just… doesn’t work. Your program dies before it even starts doing anything useful. Not ideal when you’re trying to ship software.
Streams fix this by letting you work with data piece by piece. You read a chunk, maybe a few kilobytes, process just that chunk, pass it along, then grab the next chunk. The full 10GB file never sits in memory all at once - you’re just juggling small pieces as they flow through your program. Your memory usage stays flat and predictable, no matter how massive the file gets.
Now, a stream isn’t the actual data itself. It’s more of an abstraction - an interface for handling data that flows continuously over time. Think about it: whether you’re reading from a file, receiving an HTTP request, or getting input from stdin, the pattern is fundamentally the same. Data comes in chunks, you need to handle those chunks, and you need to deal with what happens when there’s too much data coming too fast. Streams give you a consistent API for all of that, which means you can connect different I/O operations together without having to rewrite everything for each specific case.
The Four Kind of Streams
Every stream in Node.js is an instance of a class that implements one of four fundamental types. Once you get these four down, everything else in the stream ecosystem starts making sense. They’re called Readable, Writable, Duplex, and Transform. Each one has a specific job in how data moves through your application.
Readable Streams
A Readable stream is a source of data. It represents a place where data originates from. When you’re reading a file from your hard drive, that’s a readable stream. An incoming HTTP request hitting your server? Readable stream. The process.stdin
that captures what you type on your keyboard? Also a readable stream. The defining characteristic is that you consume data from it - you don’t write to a readable stream.
Now here’s where it gets a bit tricky. There are two primary ways to consume data from a readable stream, and they correspond to two different “modes” the stream can operate in. They’re called paused mode and flowing mode. This distinction confused the hell out of me when I was learning this stuff, but it’s actually pretty important.
By default, all readable streams start in paused mode. In this mode, you have to explicitly ask for data. Nothing happens automatically. The stream just sits there until you tell it you want some data. The primary mechanism for doing this is the .read()
method.
// readable-paused-mode.js
import { writeFile } from "node:fs/promises";
import fs from "node:fs";
// Create a dummy file first (note: using async API)
await writeFile("./my-data.txt", "Here is some data that will be streamed chunk by chunk.");
const readableStream = fs.createReadStream("./my-data.txt", { encoding: "utf8" });
// The stream is in PAUSED mode by default.
// We must listen for the 'readable' event to know when data is available to be read.
readableStream.on("readable", () => {
console.log("--> Stream is readable");
let chunk;
// Use a loop to make sure we read all data currently in the internal buffer.
// stream.read() will return null when there is no more data in the buffer.
while (null !== (chunk = readableStream.read())) {
console.log(`Received chunk of size ${chunk.length}:`);
console.log(`"${chunk}"\n`);
}
});
// The 'end' event fires when there is no more data to be consumed from the stream.
readableStream.on("end", () => {
console.log("--> Reached end of stream.");
});
When you run this code, you’ll see the 'readable'
event fires, and inside that event handler, you call .read()
to pull chunks of data out of the stream’s internal buffer. The .read()
method returns either a chunk of data or null
if the buffer is empty. That while
loop is there to make sure you drain the buffer completely every time the 'readable'
event fires. You keep calling .read()
until it gives you null
, which means “buffer’s empty, nothing left right now.”
The other mode is flowing mode. A stream automatically switches from paused to flowing mode as soon as you attach a listener for the 'data'
event. In flowing mode, you don’t ask for data - the stream pushes data to you as fast as it can read it from the source. You just have to be ready to catch it when it arrives.
// readable-flowing-mode.js
import fs from "node:fs";
const readableStream = fs.createReadStream("./my-data.txt", { encoding: "utf8" });
// By adding a 'data' event listener, we switch the stream to FLOWING mode.
// Data will be pushed to us automatically.
readableStream.on("data", (chunk) => {
console.log("--> Received a chunk of data:");
console.log(`"${chunk}"\n`);
});
// The 'end' event still tells us when we're done.
readableStream.on("end", () => {
console.log("--> Reached end of stream.");
});
// Always, always, always handle errors.
readableStream.on("error", (err) => {
console.error("An error occurred:", err);
});
Notice there’s no .read()
call anywhere in this code. We just tell the stream, “Hey, when you have data, run this function.” The stream then starts reading from the source (in this case the file) and emits 'data'
events with chunks as they become available. The code is simpler to read, but there’s a risk here - if your source is producing data faster than you can process it, you could run into trouble. We’ll get to how to solve that problem when we talk about backpressure.
Async Iteration
There’s actually a third way to consume readable streams, and honestly it’s the cleanest one. Modern Node.js lets you treat every readable stream as an async iterable, which means you can use for await...of
to consume it:
// readable-async-iteration.js
import fs from "node:fs";
const readableStream = fs.createReadStream("./my-data.txt", { encoding: "utf8" });
try {
for await (const chunk of readableStream) {
console.log("Got chunk:", chunk);
}
console.log("Stream finished.");
} catch (err) {
console.error("Error reading stream:", err);
}
This pattern is way cleaner than both of the previous approaches. Error handling happens naturally with try/catch blocks, and it automatically respects backpressure (which we’ll cover soon). This is what I reach for when I’m writing new code. It’s the recommended approach in modern Node.js.
Finally, two other events are absolutely essential for any readable stream consumer. The 'end'
event fires exactly once, when there’s no more data to be read from the source. The stream is done, finished, nothing left to give you. The 'error'
event fires if something goes wrong during the reading process - maybe the file doesn’t exist, maybe a network connection drops, whatever. You must always listen for the 'error'
event. I’m serious about this one. An unhandled error event on a stream will crash your entire Node.js process. I’ve debugged production issues caused by this more times than I want to admit.
Writable Streams
A Writable stream is a destination for data. It’s the receiving end. When you’re writing to a file, sending an HTTP response from your server, or printing to the console with process.stdout
, you’re working with writable streams. You don’t read from a writable stream - you send data to it.
The main method for sending data is .write(chunk)
. You call this with whatever data you want to send. When you’re completely finished and have no more data to write, you call .end()
. Calling .end()
tells the stream that no more data is coming, and once all the buffered data has been written to the underlying destination, it fires a 'finish'
event.
Sounds straightforward, right? But there’s a crucial detail hiding in how .write()
actually works. What happens when you’re trying to write to a destination that’s slow? Say you’re writing to a network connection that has limited bandwidth, or a hard drive that’s busy with other operations. You might be calling .write()
way faster than the destination can actually handle the data. The writable stream starts buffering that data in memory to cope with the mismatch. If you just keep writing without paying attention to anything, that buffer grows and grows, potentially eating up all your available memory and crashing your application. This is exactly the problem streams were designed to solve in the first place.
The solution to this problem is called backpressure.
Here’s how it works: the .write()
method doesn’t just send data and forget about it. It returns a boolean value. If it returns true
, you’re good to keep writing more data right away. If it returns false
, that means the internal buffer has exceeded a certain threshold (this threshold is called the highWaterMark
). That false
return value is the signal - the backpressure signal. The writable stream is telling you, “Stop sending me data for a bit, I need to catch up and actually write what you’ve already given me.”
When you get a false
from .write()
, you should stop writing. But how do you know when it’s safe to start again? The writable stream will emit a 'drain'
event. That’s your signal to resume. When you hear 'drain'
, the buffer has been flushed out and it’s safe to start writing again.
Here’s how to handle this correctly. We’ll simulate a slow destination by creating a custom writable stream:
// writable-backpressure.js
import { Writable } from "node:stream";
// A slow destination stream for demonstration
const slowWriteStream = new Writable({
// We just need to implement the write method
write(chunk, encoding, callback) {
console.log(`Writing chunk: "${chunk.toString()}"`);
// Simulate a slow I/O operation, like a network call or disk write
setTimeout(() => {
console.log("...write operation complete.");
callback(); // Signal that we are ready for the next chunk
}, 1000); // Takes 1 second to process each chunk
},
});
let i = 0;
function writeLotsOfData() {
// ...omitted for brevity
}
// Start the writing process
writeLotsOfData();
slowWriteStream.on("finish", () => {
console.log("--> Writable stream finished.");
});
slowWriteStream.on("error", (err) => {
console.error("Writable stream error:", err);
});
Running this code shows the mechanism in action. The writeLotsOfData
function enters a loop, calling .write()
. Initially, .write()
returns true
, and data is buffered internally. The simulated write
operation takes a full second. Very quickly, the internal buffer fills up, .write()
returns false
, and the log shows BACKPRESSURE APPLIED
. The writing loop stops. One second later, the slow operation completes, the buffer has space again, the 'drain'
event fires, and the writeLotsOfData
function is called again to resume writing.
This dance - checking .write()
return values and waiting for 'drain'
- is the fundamental mechanism of backpressure. It’s how a fast source can communicate gracefully with a slow destination without overwhelming it. Forgetting to respect this signal is honestly one of the most common sources of memory leaks in Node.js applications. I’ve seen production apps eating gigabytes of RAM because someone didn’t check the return value of .write()
.
Let’s visualize this pattern. Use the next/prev buttons to go ahead/back into different phases.
Normal Write
write() returns true - buffer has space, keep writing
This diagram illustrates the flow. The source writes until the writable’s buffer is full, at which point .write()
returns false
. The source then waits for the 'drain'
event before it starts writing again.
Duplex Streams
A Duplex stream is one that is both Readable and Writable, but the two sides operate independently. You can think of it as a readable stream and a writable stream joined together into a single object. What you write to a duplex stream is not necessarily what you get when you read from it.
The classic example is a TCP socket (net.Socket
). When you establish a network connection with another computer, you get a socket object. You can write data to that socket to send it to the other computer, and you can read data from that same socket object to receive data from the other computer. These are two separate channels - one for sending, one for receiving - encapsulated in one object.
// duplex-socket-example.js
import net from "node:net";
// Create a simple TCP server
const server = net.createServer((socket) => {
console.log("Client connected.");
// The 'socket' object is a Duplex stream.
// It's readable (we can receive data from the client).
socket.on("data", (chunk) => {
console.log(`Server received: "${chunk.toString()}"`);
});
// It's also writable (we can send data to the client).
socket.write("Hello from the server!\n");
// Handle client disconnection
socket.on("end", () => {
console.log("Client disconnected.");
});
socket.on("error", (err) => {
console.log(`Socket error: ${err.message}`);
});
});
server.listen(8000, () => {
console.log("Server listening on port 8000");
});
To test this, you can run the server and then connect to it using a command-line tool like netcat
or telnet
:
telnet localhost 8000
Once connected, you’ll immediately see “Hello from the server!” printed. Anything you type into the telnet session will be sent to the server and logged to its console. You’re writing to the socket from the client, and the server is reading from it. The server writes to the socket, and the client reads from it. It’s a two-way street, a perfect example of a Duplex stream.
Transform Streams
A Transform stream is a special kind of Duplex stream where the output is a direct result of the input. It’s a stream that modifies or transforms the data as it passes through. You write data to its writable side, the stream performs some operation on that data, and then it pushes the result out of its readable side.
Transform streams are the workhorses of stream pipelines. They sit in the middle, connecting a readable source to a writable destination, changing the data along the way.
Node.js has several built-in Transform streams. The zlib
streams for compression and decompression are a great example. You can pipe raw data into a zlib.createGzip()
stream, and what you read out the other side is compressed gzip data. Another example is the crypto
module’s createCipheriv
for encryption.
Let’s see this in action by creating a simple program that compresses a file.
// transform-gzip-example.js
import fs from "node:fs";
import zlib from "node:zlib";
// Create a readable stream from a source file.
const source = fs.createReadStream("./my-data.txt");
// Create a writable stream to a destination file.
const destination = fs.createWriteStream("./my-data.txt.gz");
// Create a Gzip transform stream.
const gzip = zlib.createGzip();
// Now, we build a pipeline.
console.log("Starting compression pipeline...");
source
.pipe(gzip) // The output of the source file goes into the gzip stream.
.pipe(destination); // The output of the gzip stream goes into the destination file.
// We can listen for the 'finish' event on the final destination stream.
destination.on("finish", () => {
console.log("File compression complete.");
});
// It's also good practice to handle errors at each stage.
source.on("error", (err) => console.error("Source stream error:", err));
gzip.on("error", (err) => console.error("Gzip stream error:", err));
destination.on("error", (err) => console.error("Destination stream error:", err));
This code is remarkably concise and powerful for what it does. It reads the source file chunk by chunk, passes each chunk to the gzip
transform stream which compresses it, and then passes the compressed chunk to the writable file stream. At no point is the entire file held in memory. This is why streams are so powerful for large file operations.
This is what that pipeline looks like conceptually:
fs.createReadStream
(Readable)
my-data.txt
zlib.createGzip()
(Transform Stream)
Compressing...
fs.createWriteStream
(Writable)
my-data.txt.gz
Each link between the stages is a connection that handles the flow of data and, crucially, the backpressure signals, all managed for us by a single, simple method.
pipe()
We just used it, but it’s worth dedicating a whole section to the .pipe()
method. It’s available on all readable streams, and it’s the primary way you should be composing streams together. readable.pipe(writable)
is the Node.js way of saying, “Take everything that comes out of this readable stream and send it into this writable stream.”
But it does so much more than that. A naive implementation might look like this:
// This is a naive pipe implementation. DO NOT USE THIS.
readable.on("data", (chunk) => {
writable.write(chunk);
});
readable.on("end", () => {
writable.end();
});
This looks reasonable, but it’s dangerously wrong. It completely ignores backpressure. If readable
produces data faster than writable
can handle it, the writable.write(chunk)
call will start buffering, and you’ll run out of memory.
The real .pipe()
method is a finely tuned mechanism that handles the entire lifecycle of the stream connection for you.
When you call source.pipe(destination)
, here’s what actually happens:
- It attaches a
'data'
event listener to thesource
, so it automatically puts thesource
into flowing mode. When a chunk arrives, it callsdestination.write(chunk)
. - It manages backpressure automagically for you. It checks the return value of
destination.write(chunk)
. If it returnsfalse
,pipe()
automatically callssource.pause()
to stop the flow of data. It then internally registers a'drain'
event listener on thedestination
. When the'drain'
event fires,pipe()
callssource.resume()
to get the data flowing again. This is the entire backpressure dance, handled for you perfectly and automatically. - It listens for the
'end'
event on thesource
. When that fires,pipe()
automatically callsdestination.end()
to signal that no more data will be written. - Here’s the critical limitation:
.pipe()
does not automatically propagate error events across the chain, and it doesn’t provide robust cleanup when errors occur. If a stream in the middle of a long chain errors out,.pipe()
won’t automatically destroy all the connected streams. This is the main reasonstream.pipeline()
was created (we’ll cover it shortly).
Because .pipe()
returns the destination stream, you can chain them together to create powerful processing pipelines, as we saw with the gzip
example:
source.pipe(transform1).pipe(transform2).pipe(destination);
In this chain, backpressure propagates all the way back up the chain. If destination
gets overwhelmed and applies backpressure, transform2
will be paused. This will cause the internal buffer of transform2
to fill up, so it will stop reading from transform1
. This, in turn, causes transform1
to apply backpressure to the original source
. The entire pipeline throttles itself automatically based on the speed of its slowest component. It’s a remarkably robust system.
Important: While .pipe()
handles backpressure beautifully, for production code you should use stream.pipeline()
or stream/promises
pipeline, which we’ll cover next. They provide the error handling and cleanup that .pipe()
lacks.
Building Your Own Streams
Using the built-in streams is great, but the real power comes when you start implementing your own custom streams to encapsulate your application’s logic. All four stream types can be extended from the base classes provided by the node:stream
module.
To create your own readable stream, you extend the Readable
class and implement a single method: _read(size)
.
The _read
method is not one you call yourself. The stream’s internal machinery calls it for you when a consumer is ready for more data. Your job inside _read
is to get some data from your underlying source (a database, a hardware device, a number generator, whatever) and push it into the stream’s internal buffer using this.push(data)
.
When you have no more data to provide, you signal the end of the stream by calling this.push(null)
.
Important note: this.push(data)
returns a boolean. If it returns false
, it means the internal buffer is full and you should stop pushing data until _read
is called again. This is the internal backpressure mechanism.
Let’s build a simple readable stream that emits random numbers for a limited number of times.
// custom-readable-stream.js
import { Readable } from "node:stream";
class RandomNumberStream extends Readable {
constructor(maxIterations, options) {
super(options);
this.maxIterations = maxIterations;
this.currentIteration = 0;
}
// This method is called by the stream's internals when it's ready for more data.
_read(size) {
if (this.currentIteration >= this.maxIterations) {
// We have no more data to produce. Signal the end of the stream.
this.push(null);
return;
}
// Generate some data. We'll send a string.
const randomNumber = Math.floor(Math.random() * 100);
const dataChunk = `Random Number: ${randomNumber}\n`;
// Push the data into the internal buffer. The consumer can now read it.
console.log(`Pushing data to buffer: "${dataChunk.trim()}"`);
const shouldContinue = this.push(dataChunk);
// If push() returns false, the buffer is full. We should stop pushing
// until _read is called again. For this simple example, we push once per _read call.
this.currentIteration++;
}
}
// Now let's use our custom stream
const randomNumberStream = new RandomNumberStream(5);
// We can consume it just like any other readable stream.
// Let's pipe it to process.stdout, which is a writable stream.
randomNumberStream.pipe(process.stdout);
When you run this, you’ll see the log messages from inside _read
. The _read
method is called, it pushes a chunk of data, and pipe
reads it and writes it to the console. This happens five times, after which _read
pushes null
, the stream emits 'end'
, and the pipe is closed cleanly. The consumer (process.stdout
in this case) dictates when _read
is called. If the consumer is slow, _read
won’t be called as often. The stream automatically handles the pacing.
You can also create readable streams from iterables using Readable.from()
:
// readable-from-iterable.js
import { Readable } from "node:stream";
// Create a readable stream from an array
const dataStream = Readable.from(["line 1\n", "line 2\n", "line 3\n"]);
dataStream.pipe(process.stdout);
This is incredibly useful for testing and for creating streams from in-memory data structures.
Implementing a Custom Writable Stream
To create a custom writable stream, you extend the Writable
class and implement the _write(chunk, encoding, callback)
method.
This is the inverse of _read
. The stream internals call _write
whenever a producer has written a chunk of data to the stream. Your job is to take that chunk
and do something with it - write it to a file, send it over the network, save it to a database.
The callback
function is absolutely crucial. You must call it when you are finished processing the chunk. It’s the signal to the stream’s machinery that you are ready for the next chunk of data. If you fail to call the callback, the stream will stall forever. If an error occurred during your processing, you pass an error object to the callback: callback(new Error('Something went wrong'))
. Otherwise, you call it with no arguments: callback()
.
Let’s build a simple writable stream that logs incoming data to the console, but with a simulated delay to see backpressure in action.
// custom-writable-stream.js
import { Writable, Readable } from "node:stream";
class LoggingWritable extends Writable {
constructor(options) {
super(options);
}
// This method is called for every chunk written to the stream.
_write(chunk, encoding, callback) {
const data = chunk.toString().trim();
console.log(`[LoggingWritable] Received: "${data}"`);
// Simulate a slow asynchronous operation
setTimeout(() => {
console.log("[LoggingWritable] ...successfully processed chunk.");
// VERY IMPORTANT: Call the callback to signal we are ready for the next chunk.
callback();
}, 500); // 500ms delay
}
}
// For a source, let's use a simple readable stream.
class DataSource extends Readable {
constructor(options) {
super(options);
this.index = 0;
}
_read(size) {
this.index++;
if (this.index > 5) {
this.push(null);
} else {
const chunk = `Data item ${this.index}`;
this.push(chunk);
}
}
}
const source = new DataSource();
const logger = new LoggingWritable();
// Pipe the source to our custom writable stream
source.pipe(logger);
logger.on("finish", () => {
console.log("LoggingWritable has finished processing all data.");
});
When you run this, you’ll see the source produces data immediately, but the LoggingWritable
takes 500ms to process each chunk. Because we’re using .pipe()
, the source
stream will be automatically paused as soon as the logger
stream’s internal buffer fills up. The source
will only be resumed once the logger
calls its callback
, which frees up buffer space and fires the 'drain'
event internally. The entire flow is self-regulating.
Implementing a Custom Transform Stream
Implementing a transform stream is often the most useful of the three. It lets you create reusable components for your data processing pipelines. You extend the Transform
class and implement a _transform(chunk, encoding, callback)
method. You might also implement an optional _flush(callback)
method.
The _transform
method is a combination of _read
and _write
. It’s called with a chunk
of data from the upstream source. Your job is to process that chunk and then use this.push(processed_chunk)
to push the result to the downstream consumer. You can push zero, one, or many times for each input chunk. Once you are done with the input chunk
, you must call the callback
to signal that you are ready for the next one.
The optional _flush
method is called right before the stream is about to end. It’s your last chance to emit any remaining data that might be held in an internal state. For example, if you were creating a transform that groups log lines by day, you would use _flush
to push out the final day’s group.
Let’s create a transform stream that converts incoming text data to uppercase.
// custom-transform-stream.js
import { Transform } from "node:stream";
import fs from "node:fs";
import { writeFile } from "node:fs/promises";
class UppercaseTransform extends Transform {
constructor(options) {
super(options);
}
_transform(chunk, encoding, callback) {
// Convert the incoming chunk to its uppercase version.
const uppercasedChunk = chunk.toString().toUpperCase();
// Push the processed data to the readable side of the stream.
this.push(uppercasedChunk);
// Call the callback to signal we're ready for the next chunk.
callback();
}
}
// Let's test it in a pipeline.
// Create our source file
await writeFile("./lowercase-data.txt", "this is a test.\nhello world.\nend of file.");
const source = fs.createReadStream("./lowercase-data.txt");
const uppercaser = new UppercaseTransform();
const destination = process.stdout;
console.log("Starting uppercasing pipeline:");
// source -> uppercaser -> destination
source.pipe(uppercaser).pipe(destination);
This is incredibly clean. We’ve encapsulated the “uppercasing” logic into its own reusable stream component. We can now insert this UppercaseTransform
into any stream pipeline that deals with text. It’s composable, memory-efficient, and follows the standard Node.js stream patterns.
Object Mode Streams
So far, every chunk of data we’ve dealt with has been either a Buffer or a string. This is the default behavior of streams. However, streams can be configured to handle any JavaScript object (except null
, which always signifies the end of the stream). This is called Object Mode.
To enable object mode, you simply pass { objectMode: true }
in the stream’s constructor options.
This is a game-changer for building data processing pipelines that don’t operate on raw bytes but on structured data. You can have a readable stream that pulls records from a database and emits them as JavaScript objects, a transform stream that filters or enriches these objects, and a writable stream that takes the final objects and saves them somewhere else.
Let’s refactor our number generator to be an object stream.
// object-mode-readable.js
import { Readable, Transform, Writable } from "node:stream";
class UserStream extends Readable {
constructor(options) {
// Make sure to pass the objectMode flag up to the parent constructor.
// When accepting options from callers, merge them properly:
super({ objectMode: true, ...options });
this.users = [
{ id: 1, name: "Alice", role: "admin" },
{ id: 2, name: "Bob", role: "user" },
{ id: 3, name: "Charlie", role: "user" },
{ id: 4, name: "Diana", role: "admin" },
];
this.index = 0;
}
_read(size) {
if (this.index >= this.users.length) {
this.push(null);
return;
}
const userObject = this.users[this.index];
console.log(`[UserStream] Pushing user:`, userObject);
this.push(userObject);
this.index++;
}
}
// Now let's create a Transform stream in object mode to filter for admins.
class AdminFilterTransform extends Transform {
constructor(options) {
super({ objectMode: true, ...options });
}
_transform(user, encoding, callback) {
// 'user' is now a JavaScript object, not a Buffer.
if (user.role === "admin") {
console.log(`[AdminFilter] Found an admin, passing through:`, user.name);
// Push the object to the next stage.
this.push(user);
} else {
console.log(`[AdminFilter] Filtering out non-admin:`, user.name);
// We don't push anything, effectively filtering it out.
}
// Signal we're ready for the next user object.
callback();
}
}
// And finally, a Writable stream in object mode to "process" the results.
class UserProcessor extends Writable {
constructor(options) {
super({ objectMode: true, ...options });
}
_write(user, encoding, callback) {
console.log(`[UserProcessor] Saving user to database:`, user);
// Simulate async DB save
setTimeout(callback, 200);
}
}
// Build the pipeline
const userSource = new UserStream();
const adminFilter = new AdminFilterTransform();
const dbSaver = new UserProcessor();
console.log("--- Starting User Processing Pipeline ---");
userSource.pipe(adminFilter).pipe(dbSaver);
dbSaver.on("finish", () => {
console.log("--- User Processing Pipeline Finished ---");
});
This example shows the true power of object mode streams. We’ve built a multi-stage data processing pipeline for structured objects. It’s still memory-efficient because we only ever hold one user object at a time in each stage of the pipe. Backpressure still works exactly the same way; if the UserProcessor
is slow to save to the database, it will apply backpressure all the way up to the UserStream
, causing it to stop reading from its internal array.
A key difference with object mode is the concept of size. For regular streams, the highWaterMark
option is measured in bytes (default 16KB). In object mode, it’s measured in the number of objects. The default highWaterMark
for object mode streams is 16. This means writable.write()
will return false
once there are 16 objects in the buffer waiting to be processed.
You can tune the highWaterMark
for your use case:
// Higher buffer for better throughput in slow networks
const fastStream = new UserStream({ highWaterMark: 100 });
// Lower buffer for memory-constrained environments
const slowStream = new UserStream({ highWaterMark: 5 });
Modern Stream APIs
While .pipe()
is fantastic, it has one historical weakness: error handling in long chains. If a stream late in the pipeline emits an error, that error doesn’t reliably destroy all the earlier streams in the chain. This can lead to dangling file handles or memory leaks.
To solve this, Node.js introduced stream.pipeline()
. It’s a module utility function that is now the recommended way to build stream pipelines.
pipeline(stream1, stream2, stream3, ..., callback)
It takes a sequence of streams and pipes them together. Its main advantages are:
- Robust Error Handling: If any stream in the pipeline emits an error,
pipeline
ensures that all streams in the chain are properly destroyed and cleaned up. - Centralized Callback: It provides a single callback that is executed only when the pipeline is completely finished, either successfully or with an error. This is much cleaner than attaching listeners to the last stream.
Here’s our file compression example, rewritten with callback-based pipeline
:
// pipeline-callback-example.js
import fs from "node:fs";
import zlib from "node:zlib";
import { pipeline } from "node:stream";
const source = fs.createReadStream("./my-data.txt");
const destination = fs.createWriteStream("./my-data.txt.gz.new");
const gzip = zlib.createGzip();
console.log("Starting compression pipeline with stream.pipeline()...");
pipeline(source, gzip, destination, (err) => {
if (err) {
console.error("Pipeline failed:", err);
} else {
console.log("Pipeline succeeded.");
}
});
This is much safer. If destination
fails (e.g., disk is full), the pipeline
function will catch that error, destroy the gzip
and source
streams as well, and then call our final callback with the error.
Promise-Based Pipeline
Even better, Node.js provides a Promise-based version of pipeline
in the node:stream/promises
module. This is the recommended approach for async/await code:
// modern-pipeline-example.js
import fs from "node:fs";
import zlib from "node:zlib";
import { pipeline } from "node:stream/promises";
console.log("Starting compression pipeline...");
try {
await pipeline(fs.createReadStream("./my-data.txt"), zlib.createGzip(), fs.createWriteStream("./my-data.txt.gz.new"));
console.log("Pipeline succeeded.");
} catch (err) {
console.error("Pipeline failed:", err);
}
This integrates beautifully with async/await code and is easier to reason about than callbacks.
Pipeline with Abort Signal
One of the most powerful features of pipeline is support for AbortSignal
, which lets you cancel long-running operations:
// pipeline-with-abort.js
import fs from "node:fs";
import zlib from "node:zlib";
import { pipeline } from "node:stream/promises";
const ac = new AbortController();
// Cancel the pipeline after 5 seconds
setTimeout(() => {
console.log("Aborting pipeline due to timeout...");
ac.abort();
}, 5000);
try {
await pipeline(
fs.createReadStream("./large-file.txt"),
zlib.createGzip(),
fs.createWriteStream("./large-file.txt.gz"),
{ signal: ac.signal }, // Pass the abort signal
);
console.log("Pipeline succeeded.");
} catch (err) {
if (err.name === "AbortError") {
console.error("Pipeline was aborted (timeout)");
} else {
console.error("Pipeline failed:", err);
}
}
You can also use AbortSignal.timeout()
for a more concise timeout pattern:
await pipeline(
source,
transform,
destination,
{ signal: AbortSignal.timeout(5000) }, // 5 second timeout
);
This is essential for building robust production applications that need to handle timeouts, client disconnections, or user cancellations.
The finished
Utility
Related to pipeline
is stream.finished()
. This utility provides a reliable way to get a callback when a stream is no longer readable or writable, regardless of the reason - whether it finished successfully ('end'
, 'finish'
), errored ('error'
), or was prematurely closed ('close'
).
// finished-example.js
import fs from "node:fs";
import { finished } from "node:stream";
const writable = fs.createWriteStream("./temp-file.txt");
// Instead of listening for 'finish', 'error', 'close' separately...
finished(writable, (err) => {
if (err) {
console.error("Stream failed to finish:", err);
} else {
console.log("Stream has finished successfully.");
}
});
writable.write("Some data\n");
writable.end("Last bit of data.");
There’s also a Promise-based version:
// finished-promise-example.js
import fs from "node:fs";
import { finished } from "node:stream/promises";
const writable = fs.createWriteStream("./temp-file.txt");
writable.write("Some data\n");
writable.end("Last bit of data.");
try {
await finished(writable);
console.log("Stream has finished successfully.");
} catch (err) {
console.error("Stream failed to finish:", err);
}
You may think of pipeline
and finished
from node:stream/promises
as your best buddies for building robust, leak-free I/O operations. They codify the best practices that evolved over many years of using streams in production. While understanding how .pipe()
and the individual events work is essential for building custom streams, you should prefer the Promise-based pipeline
when composing them.
Web Streams and Interoperability
Node.js also implements the WHATWG Web Streams API in the node:stream/web
module. This is the same streams API used in browsers and other JavaScript runtimes like Deno.
The Web Streams API provides three main types: ReadableStream
,WritableStream
, and TransformStream
.
ReadableStream
is similar to Node’s Readable
, the WritableStream
is similar to Node’s Writable
and the TransformStream
is similar to Node’s Transform
.
While Node streams and Web Streams serve the same purpose, they have different APIs. Node.js provides utilities to convert between them:
// web-streams-interop.js
import { Readable } from "node:stream";
import { ReadableStream } from "node:stream/web";
// Convert a Node readable to a Web ReadableStream
const nodeStream = Readable.from(["chunk 1\n", "chunk 2\n"]);
const webStream = Readable.toWeb(nodeStream);
// Convert a Web ReadableStream to a Node readable
const backToNode = Readable.fromWeb(webStream);
for await (const chunk of backToNode) {
console.log("Chunk:", chunk);
}
When to use each? Use Node streams when working with Node.js-specific APIs (fs, net, http, etc.) and when you need object mode or advanced features like highWaterMark
tuning. Use Web Streams when you need cross-runtime compatibility (code that runs in browsers, Deno, etc.) or when working with modern Web APIs like fetch
For most Node.js-specific work, stick with Node streams. For libraries that need to work across multiple runtimes, consider Web Streams.
Summary
Streams are one of the most powerful and fundamental patterns in programming. Here’s what to remember about streams as far as Node is concerned -
- Use streams to process large amounts of data without loading it all into memory.
- Always check the return value of
.write()
and wait for'drain'
events to take care of backpressure. - Use modern APIs like
pipeline
fromnode:stream/promises
over.pipe()
for production code. - Use
pipeline
for automatic error cleanup, or handle errors on every stream individually for await...of
is the cleanest way to consume readable streams- Pass
AbortSignal
topipeline
for timeout and cancellation support - Use object mode for processing structured data in memory-efficient pipelines
- Tune
highWaterMark
based on your use case (throughput vs memory)