Skip to Content
StreamsIntroduction to Streams
29 min read

Introduction to Streams

Note

This chapter covers streams in Node.js - what they are, why they matter, and how to use them. We’re not diving into every edge case here, just building a solid foundation. The chapters after this one will get into the weirdness.

Important

All the code examples mentioned in this chapter can be found here. 

So here’s the thing about Node.js - pretty much everything that moves data around is built on streams. Reading files? Streams. HTTP requests coming into your server? Streams. Even typing into your terminal and seeing output - all streams. Once you start looking for them, they’re everywhere. They’re the invisible infrastructure that makes Node actually work.

The whole point of streams is solving one very specific problem: memory. Say you need to work with a 10-gigabyte video file, but your server only has 8 gigabytes of RAM. The obvious approach - read the whole file into memory, process it, write it back out - just… doesn’t work. Your program dies before it even starts doing anything useful. Not ideal when you’re trying to ship software.

Streams fix this by letting you work with data piece by piece. You read a chunk, maybe a few kilobytes, process just that chunk, pass it along, then grab the next chunk. The full 10GB file never sits in memory all at once - you’re just juggling small pieces as they flow through your program. Your memory usage stays flat and predictable, no matter how massive the file gets.

Now, a stream isn’t the actual data itself. It’s more of an abstraction - an interface for handling data that flows continuously over time. Think about it: whether you’re reading from a file, receiving an HTTP request, or getting input from stdin, the pattern is fundamentally the same. Data comes in chunks, you need to handle those chunks, and you need to deal with what happens when there’s too much data coming too fast. Streams give you a consistent API for all of that, which means you can connect different I/O operations together without having to rewrite everything for each specific case.

The Four Kind of Streams

Every stream in Node.js is an instance of a class that implements one of four fundamental types. Once you get these four down, everything else in the stream ecosystem starts making sense. They’re called Readable, Writable, Duplex, and Transform. Each one has a specific job in how data moves through your application.

Readable Streams

A Readable stream is a source of data. It represents a place where data originates from. When you’re reading a file from your hard drive, that’s a readable stream. An incoming HTTP request hitting your server? Readable stream. The process.stdin that captures what you type on your keyboard? Also a readable stream. The defining characteristic is that you consume data from it - you don’t write to a readable stream.

Now here’s where it gets a bit tricky. There are two primary ways to consume data from a readable stream, and they correspond to two different “modes” the stream can operate in. They’re called paused mode and flowing mode. This distinction confused the hell out of me when I was learning this stuff, but it’s actually pretty important.

By default, all readable streams start in paused mode. In this mode, you have to explicitly ask for data. Nothing happens automatically. The stream just sits there until you tell it you want some data. The primary mechanism for doing this is the .read() method.

// readable-paused-mode.js import { writeFile } from "node:fs/promises"; import fs from "node:fs"; // Create a dummy file first (note: using async API) await writeFile("./my-data.txt", "Here is some data that will be streamed chunk by chunk."); const readableStream = fs.createReadStream("./my-data.txt", { encoding: "utf8" }); // The stream is in PAUSED mode by default. // We must listen for the 'readable' event to know when data is available to be read. readableStream.on("readable", () => { console.log("--> Stream is readable"); let chunk; // Use a loop to make sure we read all data currently in the internal buffer. // stream.read() will return null when there is no more data in the buffer. while (null !== (chunk = readableStream.read())) { console.log(`Received chunk of size ${chunk.length}:`); console.log(`"${chunk}"\n`); } }); // The 'end' event fires when there is no more data to be consumed from the stream. readableStream.on("end", () => { console.log("--> Reached end of stream."); });

When you run this code, you’ll see the 'readable' event fires, and inside that event handler, you call .read() to pull chunks of data out of the stream’s internal buffer. The .read() method returns either a chunk of data or null if the buffer is empty. That while loop is there to make sure you drain the buffer completely every time the 'readable' event fires. You keep calling .read() until it gives you null, which means “buffer’s empty, nothing left right now.”

The other mode is flowing mode. A stream automatically switches from paused to flowing mode as soon as you attach a listener for the 'data' event. In flowing mode, you don’t ask for data - the stream pushes data to you as fast as it can read it from the source. You just have to be ready to catch it when it arrives.

// readable-flowing-mode.js import fs from "node:fs"; const readableStream = fs.createReadStream("./my-data.txt", { encoding: "utf8" }); // By adding a 'data' event listener, we switch the stream to FLOWING mode. // Data will be pushed to us automatically. readableStream.on("data", (chunk) => { console.log("--> Received a chunk of data:"); console.log(`"${chunk}"\n`); }); // The 'end' event still tells us when we're done. readableStream.on("end", () => { console.log("--> Reached end of stream."); }); // Always, always, always handle errors. readableStream.on("error", (err) => { console.error("An error occurred:", err); });

Notice there’s no .read() call anywhere in this code. We just tell the stream, “Hey, when you have data, run this function.” The stream then starts reading from the source (in this case the file) and emits 'data' events with chunks as they become available. The code is simpler to read, but there’s a risk here - if your source is producing data faster than you can process it, you could run into trouble. We’ll get to how to solve that problem when we talk about backpressure.

Async Iteration

There’s actually a third way to consume readable streams, and honestly it’s the cleanest one. Modern Node.js lets you treat every readable stream as an async iterable, which means you can use for await...of to consume it:

// readable-async-iteration.js import fs from "node:fs"; const readableStream = fs.createReadStream("./my-data.txt", { encoding: "utf8" }); try { for await (const chunk of readableStream) { console.log("Got chunk:", chunk); } console.log("Stream finished."); } catch (err) { console.error("Error reading stream:", err); }

This pattern is way cleaner than both of the previous approaches. Error handling happens naturally with try/catch blocks, and it automatically respects backpressure (which we’ll cover soon). This is what I reach for when I’m writing new code. It’s the recommended approach in modern Node.js.

Finally, two other events are absolutely essential for any readable stream consumer. The 'end' event fires exactly once, when there’s no more data to be read from the source. The stream is done, finished, nothing left to give you. The 'error' event fires if something goes wrong during the reading process - maybe the file doesn’t exist, maybe a network connection drops, whatever. You must always listen for the 'error' event. I’m serious about this one. An unhandled error event on a stream will crash your entire Node.js process. I’ve debugged production issues caused by this more times than I want to admit.

Writable Streams

A Writable stream is a destination for data. It’s the receiving end. When you’re writing to a file, sending an HTTP response from your server, or printing to the console with process.stdout, you’re working with writable streams. You don’t read from a writable stream - you send data to it.

The main method for sending data is .write(chunk). You call this with whatever data you want to send. When you’re completely finished and have no more data to write, you call .end(). Calling .end() tells the stream that no more data is coming, and once all the buffered data has been written to the underlying destination, it fires a 'finish' event.

Sounds straightforward, right? But there’s a crucial detail hiding in how .write() actually works. What happens when you’re trying to write to a destination that’s slow? Say you’re writing to a network connection that has limited bandwidth, or a hard drive that’s busy with other operations. You might be calling .write() way faster than the destination can actually handle the data. The writable stream starts buffering that data in memory to cope with the mismatch. If you just keep writing without paying attention to anything, that buffer grows and grows, potentially eating up all your available memory and crashing your application. This is exactly the problem streams were designed to solve in the first place.

The solution to this problem is called backpressure.

Here’s how it works: the .write() method doesn’t just send data and forget about it. It returns a boolean value. If it returns true, you’re good to keep writing more data right away. If it returns false, that means the internal buffer has exceeded a certain threshold (this threshold is called the highWaterMark). That false return value is the signal - the backpressure signal. The writable stream is telling you, “Stop sending me data for a bit, I need to catch up and actually write what you’ve already given me.”

When you get a false from .write(), you should stop writing. But how do you know when it’s safe to start again? The writable stream will emit a 'drain' event. That’s your signal to resume. When you hear 'drain', the buffer has been flushed out and it’s safe to start writing again.

Here’s how to handle this correctly. We’ll simulate a slow destination by creating a custom writable stream:

// writable-backpressure.js import { Writable } from "node:stream"; // A slow destination stream for demonstration const slowWriteStream = new Writable({ // We just need to implement the write method write(chunk, encoding, callback) { console.log(`Writing chunk: "${chunk.toString()}"`); // Simulate a slow I/O operation, like a network call or disk write setTimeout(() => { console.log("...write operation complete."); callback(); // Signal that we are ready for the next chunk }, 1000); // Takes 1 second to process each chunk }, }); let i = 0; function writeLotsOfData() { // ...omitted for brevity } // Start the writing process writeLotsOfData(); slowWriteStream.on("finish", () => { console.log("--> Writable stream finished."); }); slowWriteStream.on("error", (err) => { console.error("Writable stream error:", err); });

Running this code shows the mechanism in action. The writeLotsOfData function enters a loop, calling .write(). Initially, .write() returns true, and data is buffered internally. The simulated write operation takes a full second. Very quickly, the internal buffer fills up, .write() returns false, and the log shows BACKPRESSURE APPLIED. The writing loop stops. One second later, the slow operation completes, the buffer has space again, the 'drain' event fires, and the writeLotsOfData function is called again to resume writing.

This dance - checking .write() return values and waiting for 'drain' - is the fundamental mechanism of backpressure. It’s how a fast source can communicate gracefully with a slow destination without overwhelming it. Forgetting to respect this signal is honestly one of the most common sources of memory leaks in Node.js applications. I’ve seen production apps eating gigabytes of RAM because someone didn’t check the return value of .write().

Let’s visualize this pattern. Use the next/prev buttons to go ahead/back into different phases.

STEP 1 OF 3
STAGE 1

Normal Write

write() returns true - buffer has space, keep writing

SOURCE
Fast Source
(e.g. CPU)
DESTINATION
Slow Writable
(e.g. Network)
write(chunk)
Buffer:
returns true
Keep writing data

This diagram illustrates the flow. The source writes until the writable’s buffer is full, at which point .write() returns false. The source then waits for the 'drain' event before it starts writing again.

Duplex Streams

A Duplex stream is one that is both Readable and Writable, but the two sides operate independently. You can think of it as a readable stream and a writable stream joined together into a single object. What you write to a duplex stream is not necessarily what you get when you read from it.

The classic example is a TCP socket (net.Socket). When you establish a network connection with another computer, you get a socket object. You can write data to that socket to send it to the other computer, and you can read data from that same socket object to receive data from the other computer. These are two separate channels - one for sending, one for receiving - encapsulated in one object.

// duplex-socket-example.js import net from "node:net"; // Create a simple TCP server const server = net.createServer((socket) => { console.log("Client connected."); // The 'socket' object is a Duplex stream. // It's readable (we can receive data from the client). socket.on("data", (chunk) => { console.log(`Server received: "${chunk.toString()}"`); }); // It's also writable (we can send data to the client). socket.write("Hello from the server!\n"); // Handle client disconnection socket.on("end", () => { console.log("Client disconnected."); }); socket.on("error", (err) => { console.log(`Socket error: ${err.message}`); }); }); server.listen(8000, () => { console.log("Server listening on port 8000"); });

To test this, you can run the server and then connect to it using a command-line tool like netcat or telnet:

telnet localhost 8000

Once connected, you’ll immediately see “Hello from the server!” printed. Anything you type into the telnet session will be sent to the server and logged to its console. You’re writing to the socket from the client, and the server is reading from it. The server writes to the socket, and the client reads from it. It’s a two-way street, a perfect example of a Duplex stream.

Transform Streams

A Transform stream is a special kind of Duplex stream where the output is a direct result of the input. It’s a stream that modifies or transforms the data as it passes through. You write data to its writable side, the stream performs some operation on that data, and then it pushes the result out of its readable side.

Transform streams are the workhorses of stream pipelines. They sit in the middle, connecting a readable source to a writable destination, changing the data along the way.

Node.js has several built-in Transform streams. The zlib streams for compression and decompression are a great example. You can pipe raw data into a zlib.createGzip() stream, and what you read out the other side is compressed gzip data. Another example is the crypto module’s createCipheriv for encryption.

Let’s see this in action by creating a simple program that compresses a file.

// transform-gzip-example.js import fs from "node:fs"; import zlib from "node:zlib"; // Create a readable stream from a source file. const source = fs.createReadStream("./my-data.txt"); // Create a writable stream to a destination file. const destination = fs.createWriteStream("./my-data.txt.gz"); // Create a Gzip transform stream. const gzip = zlib.createGzip(); // Now, we build a pipeline. console.log("Starting compression pipeline..."); source .pipe(gzip) // The output of the source file goes into the gzip stream. .pipe(destination); // The output of the gzip stream goes into the destination file. // We can listen for the 'finish' event on the final destination stream. destination.on("finish", () => { console.log("File compression complete."); }); // It's also good practice to handle errors at each stage. source.on("error", (err) => console.error("Source stream error:", err)); gzip.on("error", (err) => console.error("Gzip stream error:", err)); destination.on("error", (err) => console.error("Destination stream error:", err));

This code is remarkably concise and powerful for what it does. It reads the source file chunk by chunk, passes each chunk to the gzip transform stream which compresses it, and then passes the compressed chunk to the writable file stream. At no point is the entire file held in memory. This is why streams are so powerful for large file operations.

This is what that pipeline looks like conceptually:

fs.createReadStream

(Readable)

my-data.txt

ACTIVE
Raw data chunks

zlib.createGzip()

(Transform Stream)

Compressing...

UPCOMING
Compressed chunks

fs.createWriteStream

(Writable)

my-data.txt.gz

UPCOMING

Each link between the stages is a connection that handles the flow of data and, crucially, the backpressure signals, all managed for us by a single, simple method.

pipe()

We just used it, but it’s worth dedicating a whole section to the .pipe() method. It’s available on all readable streams, and it’s the primary way you should be composing streams together. readable.pipe(writable) is the Node.js way of saying, “Take everything that comes out of this readable stream and send it into this writable stream.”

But it does so much more than that. A naive implementation might look like this:

// This is a naive pipe implementation. DO NOT USE THIS. readable.on("data", (chunk) => { writable.write(chunk); }); readable.on("end", () => { writable.end(); });

This looks reasonable, but it’s dangerously wrong. It completely ignores backpressure. If readable produces data faster than writable can handle it, the writable.write(chunk) call will start buffering, and you’ll run out of memory.

The real .pipe() method is a finely tuned mechanism that handles the entire lifecycle of the stream connection for you.

When you call source.pipe(destination), here’s what actually happens:

  1. It attaches a 'data' event listener to the source, so it automatically puts the source into flowing mode. When a chunk arrives, it calls destination.write(chunk).
  2. It manages backpressure automagically for you. It checks the return value of destination.write(chunk). If it returns false, pipe() automatically calls source.pause() to stop the flow of data. It then internally registers a 'drain' event listener on the destination. When the 'drain' event fires, pipe() calls source.resume() to get the data flowing again. This is the entire backpressure dance, handled for you perfectly and automatically.
  3. It listens for the 'end' event on the source. When that fires, pipe() automatically calls destination.end() to signal that no more data will be written.
  4. Here’s the critical limitation: .pipe() does not automatically propagate error events across the chain, and it doesn’t provide robust cleanup when errors occur. If a stream in the middle of a long chain errors out, .pipe() won’t automatically destroy all the connected streams. This is the main reason stream.pipeline() was created (we’ll cover it shortly).

Because .pipe() returns the destination stream, you can chain them together to create powerful processing pipelines, as we saw with the gzip example:

source.pipe(transform1).pipe(transform2).pipe(destination);

In this chain, backpressure propagates all the way back up the chain. If destination gets overwhelmed and applies backpressure, transform2 will be paused. This will cause the internal buffer of transform2 to fill up, so it will stop reading from transform1. This, in turn, causes transform1 to apply backpressure to the original source. The entire pipeline throttles itself automatically based on the speed of its slowest component. It’s a remarkably robust system.

Important: While .pipe() handles backpressure beautifully, for production code you should use stream.pipeline() or stream/promises pipeline, which we’ll cover next. They provide the error handling and cleanup that .pipe() lacks.

Building Your Own Streams

Using the built-in streams is great, but the real power comes when you start implementing your own custom streams to encapsulate your application’s logic. All four stream types can be extended from the base classes provided by the node:stream module.

To create your own readable stream, you extend the Readable class and implement a single method: _read(size).

The _read method is not one you call yourself. The stream’s internal machinery calls it for you when a consumer is ready for more data. Your job inside _read is to get some data from your underlying source (a database, a hardware device, a number generator, whatever) and push it into the stream’s internal buffer using this.push(data).

When you have no more data to provide, you signal the end of the stream by calling this.push(null).

Important note: this.push(data) returns a boolean. If it returns false, it means the internal buffer is full and you should stop pushing data until _read is called again. This is the internal backpressure mechanism.

Let’s build a simple readable stream that emits random numbers for a limited number of times.

// custom-readable-stream.js import { Readable } from "node:stream"; class RandomNumberStream extends Readable { constructor(maxIterations, options) { super(options); this.maxIterations = maxIterations; this.currentIteration = 0; } // This method is called by the stream's internals when it's ready for more data. _read(size) { if (this.currentIteration >= this.maxIterations) { // We have no more data to produce. Signal the end of the stream. this.push(null); return; } // Generate some data. We'll send a string. const randomNumber = Math.floor(Math.random() * 100); const dataChunk = `Random Number: ${randomNumber}\n`; // Push the data into the internal buffer. The consumer can now read it. console.log(`Pushing data to buffer: "${dataChunk.trim()}"`); const shouldContinue = this.push(dataChunk); // If push() returns false, the buffer is full. We should stop pushing // until _read is called again. For this simple example, we push once per _read call. this.currentIteration++; } } // Now let's use our custom stream const randomNumberStream = new RandomNumberStream(5); // We can consume it just like any other readable stream. // Let's pipe it to process.stdout, which is a writable stream. randomNumberStream.pipe(process.stdout);

When you run this, you’ll see the log messages from inside _read. The _read method is called, it pushes a chunk of data, and pipe reads it and writes it to the console. This happens five times, after which _read pushes null, the stream emits 'end', and the pipe is closed cleanly. The consumer (process.stdout in this case) dictates when _read is called. If the consumer is slow, _read won’t be called as often. The stream automatically handles the pacing.

You can also create readable streams from iterables using Readable.from():

// readable-from-iterable.js import { Readable } from "node:stream"; // Create a readable stream from an array const dataStream = Readable.from(["line 1\n", "line 2\n", "line 3\n"]); dataStream.pipe(process.stdout);

This is incredibly useful for testing and for creating streams from in-memory data structures.

Implementing a Custom Writable Stream

To create a custom writable stream, you extend the Writable class and implement the _write(chunk, encoding, callback) method.

This is the inverse of _read. The stream internals call _write whenever a producer has written a chunk of data to the stream. Your job is to take that chunk and do something with it - write it to a file, send it over the network, save it to a database.

The callback function is absolutely crucial. You must call it when you are finished processing the chunk. It’s the signal to the stream’s machinery that you are ready for the next chunk of data. If you fail to call the callback, the stream will stall forever. If an error occurred during your processing, you pass an error object to the callback: callback(new Error('Something went wrong')). Otherwise, you call it with no arguments: callback().

Let’s build a simple writable stream that logs incoming data to the console, but with a simulated delay to see backpressure in action.

// custom-writable-stream.js import { Writable, Readable } from "node:stream"; class LoggingWritable extends Writable { constructor(options) { super(options); } // This method is called for every chunk written to the stream. _write(chunk, encoding, callback) { const data = chunk.toString().trim(); console.log(`[LoggingWritable] Received: "${data}"`); // Simulate a slow asynchronous operation setTimeout(() => { console.log("[LoggingWritable] ...successfully processed chunk."); // VERY IMPORTANT: Call the callback to signal we are ready for the next chunk. callback(); }, 500); // 500ms delay } } // For a source, let's use a simple readable stream. class DataSource extends Readable { constructor(options) { super(options); this.index = 0; } _read(size) { this.index++; if (this.index > 5) { this.push(null); } else { const chunk = `Data item ${this.index}`; this.push(chunk); } } } const source = new DataSource(); const logger = new LoggingWritable(); // Pipe the source to our custom writable stream source.pipe(logger); logger.on("finish", () => { console.log("LoggingWritable has finished processing all data."); });

When you run this, you’ll see the source produces data immediately, but the LoggingWritable takes 500ms to process each chunk. Because we’re using .pipe(), the source stream will be automatically paused as soon as the logger stream’s internal buffer fills up. The source will only be resumed once the logger calls its callback, which frees up buffer space and fires the 'drain' event internally. The entire flow is self-regulating.

Implementing a Custom Transform Stream

Implementing a transform stream is often the most useful of the three. It lets you create reusable components for your data processing pipelines. You extend the Transform class and implement a _transform(chunk, encoding, callback) method. You might also implement an optional _flush(callback) method.

The _transform method is a combination of _read and _write. It’s called with a chunk of data from the upstream source. Your job is to process that chunk and then use this.push(processed_chunk) to push the result to the downstream consumer. You can push zero, one, or many times for each input chunk. Once you are done with the input chunk, you must call the callback to signal that you are ready for the next one.

The optional _flush method is called right before the stream is about to end. It’s your last chance to emit any remaining data that might be held in an internal state. For example, if you were creating a transform that groups log lines by day, you would use _flush to push out the final day’s group.

Let’s create a transform stream that converts incoming text data to uppercase.

// custom-transform-stream.js import { Transform } from "node:stream"; import fs from "node:fs"; import { writeFile } from "node:fs/promises"; class UppercaseTransform extends Transform { constructor(options) { super(options); } _transform(chunk, encoding, callback) { // Convert the incoming chunk to its uppercase version. const uppercasedChunk = chunk.toString().toUpperCase(); // Push the processed data to the readable side of the stream. this.push(uppercasedChunk); // Call the callback to signal we're ready for the next chunk. callback(); } } // Let's test it in a pipeline. // Create our source file await writeFile("./lowercase-data.txt", "this is a test.\nhello world.\nend of file."); const source = fs.createReadStream("./lowercase-data.txt"); const uppercaser = new UppercaseTransform(); const destination = process.stdout; console.log("Starting uppercasing pipeline:"); // source -> uppercaser -> destination source.pipe(uppercaser).pipe(destination);

This is incredibly clean. We’ve encapsulated the “uppercasing” logic into its own reusable stream component. We can now insert this UppercaseTransform into any stream pipeline that deals with text. It’s composable, memory-efficient, and follows the standard Node.js stream patterns.

Object Mode Streams

So far, every chunk of data we’ve dealt with has been either a Buffer or a string. This is the default behavior of streams. However, streams can be configured to handle any JavaScript object (except null, which always signifies the end of the stream). This is called Object Mode.

To enable object mode, you simply pass { objectMode: true } in the stream’s constructor options.

This is a game-changer for building data processing pipelines that don’t operate on raw bytes but on structured data. You can have a readable stream that pulls records from a database and emits them as JavaScript objects, a transform stream that filters or enriches these objects, and a writable stream that takes the final objects and saves them somewhere else.

Let’s refactor our number generator to be an object stream.

// object-mode-readable.js import { Readable, Transform, Writable } from "node:stream"; class UserStream extends Readable { constructor(options) { // Make sure to pass the objectMode flag up to the parent constructor. // When accepting options from callers, merge them properly: super({ objectMode: true, ...options }); this.users = [ { id: 1, name: "Alice", role: "admin" }, { id: 2, name: "Bob", role: "user" }, { id: 3, name: "Charlie", role: "user" }, { id: 4, name: "Diana", role: "admin" }, ]; this.index = 0; } _read(size) { if (this.index >= this.users.length) { this.push(null); return; } const userObject = this.users[this.index]; console.log(`[UserStream] Pushing user:`, userObject); this.push(userObject); this.index++; } } // Now let's create a Transform stream in object mode to filter for admins. class AdminFilterTransform extends Transform { constructor(options) { super({ objectMode: true, ...options }); } _transform(user, encoding, callback) { // 'user' is now a JavaScript object, not a Buffer. if (user.role === "admin") { console.log(`[AdminFilter] Found an admin, passing through:`, user.name); // Push the object to the next stage. this.push(user); } else { console.log(`[AdminFilter] Filtering out non-admin:`, user.name); // We don't push anything, effectively filtering it out. } // Signal we're ready for the next user object. callback(); } } // And finally, a Writable stream in object mode to "process" the results. class UserProcessor extends Writable { constructor(options) { super({ objectMode: true, ...options }); } _write(user, encoding, callback) { console.log(`[UserProcessor] Saving user to database:`, user); // Simulate async DB save setTimeout(callback, 200); } } // Build the pipeline const userSource = new UserStream(); const adminFilter = new AdminFilterTransform(); const dbSaver = new UserProcessor(); console.log("--- Starting User Processing Pipeline ---"); userSource.pipe(adminFilter).pipe(dbSaver); dbSaver.on("finish", () => { console.log("--- User Processing Pipeline Finished ---"); });

This example shows the true power of object mode streams. We’ve built a multi-stage data processing pipeline for structured objects. It’s still memory-efficient because we only ever hold one user object at a time in each stage of the pipe. Backpressure still works exactly the same way; if the UserProcessor is slow to save to the database, it will apply backpressure all the way up to the UserStream, causing it to stop reading from its internal array.

A key difference with object mode is the concept of size. For regular streams, the highWaterMark option is measured in bytes (default 16KB). In object mode, it’s measured in the number of objects. The default highWaterMark for object mode streams is 16. This means writable.write() will return false once there are 16 objects in the buffer waiting to be processed.

You can tune the highWaterMark for your use case:

// Higher buffer for better throughput in slow networks const fastStream = new UserStream({ highWaterMark: 100 }); // Lower buffer for memory-constrained environments const slowStream = new UserStream({ highWaterMark: 5 });

Modern Stream APIs

While .pipe() is fantastic, it has one historical weakness: error handling in long chains. If a stream late in the pipeline emits an error, that error doesn’t reliably destroy all the earlier streams in the chain. This can lead to dangling file handles or memory leaks.

To solve this, Node.js introduced stream.pipeline(). It’s a module utility function that is now the recommended way to build stream pipelines.

pipeline(stream1, stream2, stream3, ..., callback)

It takes a sequence of streams and pipes them together. Its main advantages are:

  1. Robust Error Handling: If any stream in the pipeline emits an error, pipeline ensures that all streams in the chain are properly destroyed and cleaned up.
  2. Centralized Callback: It provides a single callback that is executed only when the pipeline is completely finished, either successfully or with an error. This is much cleaner than attaching listeners to the last stream.

Here’s our file compression example, rewritten with callback-based pipeline:

// pipeline-callback-example.js import fs from "node:fs"; import zlib from "node:zlib"; import { pipeline } from "node:stream"; const source = fs.createReadStream("./my-data.txt"); const destination = fs.createWriteStream("./my-data.txt.gz.new"); const gzip = zlib.createGzip(); console.log("Starting compression pipeline with stream.pipeline()..."); pipeline(source, gzip, destination, (err) => { if (err) { console.error("Pipeline failed:", err); } else { console.log("Pipeline succeeded."); } });

This is much safer. If destination fails (e.g., disk is full), the pipeline function will catch that error, destroy the gzip and source streams as well, and then call our final callback with the error.

Promise-Based Pipeline

Even better, Node.js provides a Promise-based version of pipeline in the node:stream/promises module. This is the recommended approach for async/await code:

// modern-pipeline-example.js import fs from "node:fs"; import zlib from "node:zlib"; import { pipeline } from "node:stream/promises"; console.log("Starting compression pipeline..."); try { await pipeline(fs.createReadStream("./my-data.txt"), zlib.createGzip(), fs.createWriteStream("./my-data.txt.gz.new")); console.log("Pipeline succeeded."); } catch (err) { console.error("Pipeline failed:", err); }

This integrates beautifully with async/await code and is easier to reason about than callbacks.

Pipeline with Abort Signal

One of the most powerful features of pipeline is support for AbortSignal, which lets you cancel long-running operations:

// pipeline-with-abort.js import fs from "node:fs"; import zlib from "node:zlib"; import { pipeline } from "node:stream/promises"; const ac = new AbortController(); // Cancel the pipeline after 5 seconds setTimeout(() => { console.log("Aborting pipeline due to timeout..."); ac.abort(); }, 5000); try { await pipeline( fs.createReadStream("./large-file.txt"), zlib.createGzip(), fs.createWriteStream("./large-file.txt.gz"), { signal: ac.signal }, // Pass the abort signal ); console.log("Pipeline succeeded."); } catch (err) { if (err.name === "AbortError") { console.error("Pipeline was aborted (timeout)"); } else { console.error("Pipeline failed:", err); } }

You can also use AbortSignal.timeout() for a more concise timeout pattern:

await pipeline( source, transform, destination, { signal: AbortSignal.timeout(5000) }, // 5 second timeout );

This is essential for building robust production applications that need to handle timeouts, client disconnections, or user cancellations.

The finished Utility

Related to pipeline is stream.finished(). This utility provides a reliable way to get a callback when a stream is no longer readable or writable, regardless of the reason - whether it finished successfully ('end', 'finish'), errored ('error'), or was prematurely closed ('close').

// finished-example.js import fs from "node:fs"; import { finished } from "node:stream"; const writable = fs.createWriteStream("./temp-file.txt"); // Instead of listening for 'finish', 'error', 'close' separately... finished(writable, (err) => { if (err) { console.error("Stream failed to finish:", err); } else { console.log("Stream has finished successfully."); } }); writable.write("Some data\n"); writable.end("Last bit of data.");

There’s also a Promise-based version:

// finished-promise-example.js import fs from "node:fs"; import { finished } from "node:stream/promises"; const writable = fs.createWriteStream("./temp-file.txt"); writable.write("Some data\n"); writable.end("Last bit of data."); try { await finished(writable); console.log("Stream has finished successfully."); } catch (err) { console.error("Stream failed to finish:", err); }

You may think of pipeline and finished from node:stream/promises as your best buddies for building robust, leak-free I/O operations. They codify the best practices that evolved over many years of using streams in production. While understanding how .pipe() and the individual events work is essential for building custom streams, you should prefer the Promise-based pipeline when composing them.

Web Streams and Interoperability

Node.js also implements the WHATWG Web Streams API in the node:stream/web module. This is the same streams API used in browsers and other JavaScript runtimes like Deno.

The Web Streams API provides three main types: ReadableStream,WritableStream, and TransformStream. ReadableStream is similar to Node’s Readable, the WritableStream is similar to Node’s Writable and the TransformStream is similar to Node’s Transform.

While Node streams and Web Streams serve the same purpose, they have different APIs. Node.js provides utilities to convert between them:

// web-streams-interop.js import { Readable } from "node:stream"; import { ReadableStream } from "node:stream/web"; // Convert a Node readable to a Web ReadableStream const nodeStream = Readable.from(["chunk 1\n", "chunk 2\n"]); const webStream = Readable.toWeb(nodeStream); // Convert a Web ReadableStream to a Node readable const backToNode = Readable.fromWeb(webStream); for await (const chunk of backToNode) { console.log("Chunk:", chunk); }

When to use each? Use Node streams when working with Node.js-specific APIs (fs, net, http, etc.) and when you need object mode or advanced features like highWaterMark tuning. Use Web Streams when you need cross-runtime compatibility (code that runs in browsers, Deno, etc.) or when working with modern Web APIs like fetch

For most Node.js-specific work, stick with Node streams. For libraries that need to work across multiple runtimes, consider Web Streams.

Summary

Streams are one of the most powerful and fundamental patterns in programming. Here’s what to remember about streams as far as Node is concerned -

  1. Use streams to process large amounts of data without loading it all into memory.
  2. Always check the return value of .write() and wait for 'drain' events to take care of backpressure.
  3. Use modern APIs like pipeline from node:stream/promises over .pipe() for production code.
  4. Use pipeline for automatic error cleanup, or handle errors on every stream individually
  5. for await...of is the cleanest way to consume readable streams
  6. Pass AbortSignal to pipeline for timeout and cancellation support
  7. Use object mode for processing structured data in memory-efficient pipelines
  8. Tune highWaterMark based on your use case (throughput vs memory)
Last updated on