Streams en Node.js: Readable, Writable, Transform y pipeline para I/O eficiente

Leer un fichero de 2 GB en Node.js con fs.readFileSync o fs.promises.readFile carga todo el contenido en memoria antes de procesarlo. Para ficheros grandes, logs, respuestas de API o datos de base de datos, los streams permiten procesar datos en fragmentos a medida que llegan, sin sobrecargar el heap.

Readable streams: leer datos en fragmentos

Un Readable stream emite eventos data con fragmentos del contenido y un evento end cuando termina. La forma más moderna es usar for await...of, que maneja la presión de vuelta (backpressure) automáticamente:

import { createReadStream } from 'fs';

// Leer un fichero grande línea a línea (modo eventos)
const stream = createReadStream('./logs.txt', { encoding: 'utf8', highWaterMark: 64 * 1024 });

let total = 0;
for await (const fragmento of stream) {
  total += fragmento.length;
  process.stdout.write('.'); // indicador de progreso
}
console.log(`nLeídos: ${total} caracteres`);

Para procesar líneas individuales sin cargar el fichero completo, usa el módulo readline que crea una interfaz de línea por línea sobre cualquier Readable:

import { createReadStream } from 'fs';
import { createInterface } from 'readline';

const rl = createInterface({
  input: createReadStream('./datos.csv'),
  crlfDelay: Infinity,
});

let lineas = 0;
for await (const linea of rl) {
  lineas++;
  // Procesar cada línea sin cargar todo el CSV en memoria
  const campos = linea.split(',');
  procesarFila(campos);
}
console.log(`Total de líneas: ${lineas}`);

Writable streams: escribir datos en fragmentos

Un Writable acepta datos con .write() y señala el final con .end(). Si .write() devuelve false, debes esperar al evento drain antes de seguir escribiendo (backpressure):

import { createWriteStream } from 'fs';

const escritura = createWriteStream('./salida.txt');

// Escritura simple
escritura.write('Primera línean');
escritura.write('Segunda línean');
escritura.end('Finn'); // última escritura y cierre

// Esperar a que termine
await new Promise((resolve, reject) => {
  escritura.on('finish', resolve);
  escritura.on('error', reject);
});
console.log('Escritura completada');

Transform streams: transformar datos en el flujo

Un Transform es un Duplex que recibe datos, los transforma y los emite. Útil para compresión, cifrado, parsing, conversión de formatos, etc.:

import { Transform } from 'stream';

// Transform que convierte texto a mayúsculas
class MayusculasTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

// Transform que filtra líneas vacías en un CSV
class FiltrarVaciosTransform extends Transform {
  #buffer = '';

  _transform(chunk, encoding, callback) {
    this.#buffer += chunk.toString();
    const lineas = this.#buffer.split('n');
    this.#buffer = lineas.pop(); // última línea incompleta

    lineas
      .filter(l => l.trim() !== '')
      .forEach(l => this.push(l + 'n'));

    callback();
  }

  _flush(callback) {
    if (this.#buffer.trim()) this.push(this.#buffer);
    callback();
  }
}

stream.pipeline: encadenar streams de forma segura

stream.pipeline encadena streams y garantiza que todos se cierren y limpien correctamente si se produce un error en cualquier punto. Desde Node.js 15, el módulo stream/promises ofrece una versión con async/await:

import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';

// Leer fichero ? comprimir con gzip ? escribir resultado
await pipeline(
  createReadStream('./datos.txt'),
  createGzip(),
  createWriteStream('./datos.txt.gz')
);
console.log('Compresión completada');

// Con transform personalizado
await pipeline(
  createReadStream('./entrada.csv'),
  new FiltrarVaciosTransform(),
  new MayusculasTransform(),
  createWriteStream('./salida.csv')
);

Web Streams API: streams estándar desde Node.js 18

Desde Node.js 18, la Web Streams API (ReadableStream, WritableStream, TransformStream) está disponible globalmente, igual que en los navegadores. Es compatible con la Fetch API y permite pasar streams directamente como body de una petición o respuesta:

// Crear un ReadableStream de Node.js desde una fuente
const readable = new ReadableStream({
  async start(controller) {
    const datos = ['chunk1', 'chunk2', 'chunk3'];
    for (const d of datos) {
      controller.enqueue(new TextEncoder().encode(d));
      await new Promise(r => setTimeout(r, 100));
    }
    controller.close();
  },
});

// Subir el stream directamente como body de fetch
const res = await fetch('https://api.ejemplo.com/subir', {
  method: 'POST',
  body: readable,
  headers: { 'Content-Type': 'application/octet-stream' },
  duplex: 'half', // necesario para body streaming en fetch
});

// Convertir entre Node.js streams y Web Streams
import { Readable } from 'stream';
const nodeReadable = createReadStream('./datos.bin');
const webReadable = Readable.toWeb(nodeReadable);

La regla práctica para elegir entre streams de Node.js y Web Streams: usa los de Node.js cuando trabajes con el sistema de ficheros o interoperes con módulos de Node.js que los usen (zlib, crypto, http); usa Web Streams cuando trabajes con fetch, Service Workers, o quieras código portable entre navegador y servidor.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP