El patrón Observable en JavaScript: reactividad, streams de eventos y mini-RxJS desde cero

El patrón Observable es el modelo mental detrás de librerías como RxJS y del proposal Observable que avanza hacia el estándar. Mientras que una Promesa resuelve un único valor, un Observable puede emitir múltiples valores a lo largo del tiempo y permite cancelar la suscripción para limpiar recursos. Entender cómo funciona por dentro hace que RxJS sea mucho más fácil de usar.

Implementación mínima desde cero

Un Observable es esencialmente una función que, cuando se suscribe, empieza a emitir valores. La suscripción devuelve una forma de cancelar:

class Observable {
  #fn;

  constructor(fn) {
    this.#fn = fn;
  }

  subscribe(observador) {
    // Normalizar: aceptar función o { next, error, complete }
    const obs = typeof observador === 'function'
      ? { next: observador }
      : observador;

    let activo = true;

    const suscriptor = {
      next: (valor) => { if (activo) obs.next?.(valor); },
      error: (err)  => { if (activo) { activo = false; obs.error?.(err); } },
      complete: ()  => { if (activo) { activo = false; obs.complete?.(); } },
    };

    // Ejecutar la función productora y obtener la limpieza
    const limpieza = this.#fn(suscriptor);

    return {
      unsubscribe: () => {
        activo = false;
        limpieza?.();
      },
    };
  }
}

// Observable de eventos DOM
function fromEvent(elemento, evento) {
  return new Observable(({ next }) => {
    elemento.addEventListener(evento, next);
    return () => elemento.removeEventListener(evento, next);
  });
}

// Observable de interval
function interval(ms) {
  return new Observable(({ next, complete }) => {
    let contador = 0;
    const id = setInterval(() => next(contador++), ms);
    return () => clearInterval(id);
  });
}

// Uso
const suscripcion = interval(500).subscribe(n => console.log('tick:', n));
setTimeout(() => suscripcion.unsubscribe(), 3000); // Para a los 3 segundos

Operadores: transformar el stream

// Añadir operadores básicos a nuestra implementación
class Observable {
  // ... constructor y subscribe anteriores ...

  map(fn) {
    return new Observable(({ next, error, complete }) => {
      const sub = this.subscribe({ next: v => next(fn(v)), error, complete });
      return () => sub.unsubscribe();
    });
  }

  filter(pred) {
    return new Observable(({ next, error, complete }) => {
      const sub = this.subscribe({ next: v => { if (pred(v)) next(v); }, error, complete });
      return () => sub.unsubscribe();
    });
  }

  take(n) {
    return new Observable((obs) => {
      let cuenta = 0;
      const sub = this.subscribe({
        next: v => {
          obs.next(v);
          if (++cuenta >= n) { obs.complete(); sub.unsubscribe(); }
        },
        error: obs.error,
        complete: obs.complete,
      });
      return () => sub.unsubscribe();
    });
  }
}

// Encadenar operadores
interval(100)
  .filter(n => n % 2 === 0)
  .map(n => n * n)
  .take(5)
  .subscribe(v => console.log(v));
// 0, 4, 16, 36, 64

RxJS en el mundo real: buscador con debounce

Uno de los casos de uso más conocidos de RxJS es implementar un buscador que espera a que el usuario deje de escribir antes de lanzar la petición:

import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, catchError } from 'rxjs/operators';
import { from, of, EMPTY } from 'rxjs';

const inputBusqueda = document.getElementById('busqueda');

const busqueda$ = fromEvent(inputBusqueda, 'input').pipe(
  // Extraer el valor del input
  map(evento => evento.target.value.trim()),

  // Esperar 300ms desde el último keystroke
  debounceTime(300),

  // No lanzar si el valor es el mismo que el anterior
  distinctUntilChanged(),

  // Cancelar la petición anterior si llega una nueva (switchMap)
  switchMap(termino => {
    if (termino.length < 2) return EMPTY; // No buscar con menos de 2 caracteres

    return from(
      fetch(`/api/buscar?q=${encodeURIComponent(termino)}`).then(r => r.json())
    ).pipe(
      catchError(err => {
        console.error('Error en búsqueda:', err);
        return of([]); // Emitir array vacío en caso de error
      })
    );
  })
);

const sub = busqueda$.subscribe(resultados => {
  mostrarResultados(resultados);
});

// Limpiar al desmontar el componente
// sub.unsubscribe();

BehaviorSubject: estado compartido reactivo

Un BehaviorSubject es un Observable especial que guarda el último valor emitido y lo entrega inmediatamente a cualquier nuevo suscriptor. Es la base para gestión de estado reactiva:

import { BehaviorSubject, combineLatest } from 'rxjs';
import { map } from 'rxjs/operators';

// Estado de la aplicación como streams reactivos
const usuario$ = new BehaviorSubject(null);
const carrito$ = new BehaviorSubject([]);
const cargando$ = new BehaviorSubject(false);

// Stream derivado: número de items en el carrito
const totalItems$ = carrito$.pipe(
  map(items => items.reduce((acc, item) => acc + item.cantidad, 0))
);

// Combinar streams para UI
const estadoUI$ = combineLatest([usuario$, totalItems$, cargando$]).pipe(
  map(([usuario, total, cargando]) => ({
    nombreUsuario: usuario?.nombre ?? 'Invitado',
    itemsCarrito: total,
    mostrarSpinner: cargando,
  }))
);

estadoUI$.subscribe(({ nombreUsuario, itemsCarrito }) => {
  document.getElementById('usuario').textContent = nombreUsuario;
  document.getElementById('carrito-count').textContent = itemsCarrito;
});

// Actualizar el estado
function iniciarSesion(datosUsuario) {
  usuario$.next(datosUsuario);
}

function añadirAlCarrito(producto) {
  const actual = carrito$.getValue();
  const existente = actual.find(i => i.id === producto.id);
  if (existente) {
    carrito$.next(actual.map(i =>
      i.id === producto.id ? { ...i, cantidad: i.cantidad + 1 } : i
    ));
  } else {
    carrito$.next([...actual, { ...producto, cantidad: 1 }]);
  }
}

Evitar fugas de memoria al desuscribirse

El problema más común con RxJS en componentes de UI es olvidar desuscribirse, lo que crea memory leaks:

import { Subject, takeUntil } from 'rxjs';

class ComponenteReact {
  // Subject que emite cuando el componente se desmonta
  #destruir$ = new Subject();

  montado() {
    // Todos los subscriptions se cancelan automáticamente al desmontar
    intervalo$ .pipe(takeUntil(this.#destruir$))
      .subscribe(n => this.setState({ contador: n }));

    busqueda$.pipe(takeUntil(this.#destruir$))
      .subscribe(resultados => this.setState({ resultados }));
  }

  desmontado() {
    this.#destruir$.next();    // Señal de destrucción
    this.#destruir$.complete(); // Limpiar el Subject
    // Todas las suscripciones con takeUntil se cancelan automáticamente
  }
}

// Alternativa: Subscription.add para agrupar
import { Subscription } from 'rxjs';

class ComponenteClase {
  #subs = new Subscription();

  montado() {
    this.#subs.add(intervalo$.subscribe(n => console.log(n)));
    this.#subs.add(eventos$.subscribe(e => console.log(e)));
  }

  desmontado() {
    this.#subs.unsubscribe(); // Cancela todas las suscripciones añadidas
  }
}

El patrón Observable brilla en aplicaciones donde los datos cambian con el tiempo de forma asíncrona: eventos de usuario, websockets, polling de APIs, animaciones. RxJS proporciona más de 100 operadores para transformar, combinar y controlar estos streams. La clave para aprovecharlos bien es entender el modelo mental: cada Observable es una función que describe cómo producir valores, y la suscripción es lo que pone esa función en marcha.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP