Combine avanzado en Swift: publishers propios, Scheduler, share(), multicast y error handling

Combine tiene un segundo nivel de complejidad que va más allá de los operadores básicos. Crear publishers propios, controlar en qué hilo se ejecuta el trabajo, compartir suscripciones costosas y manejar errores de forma elegante son las habilidades que marcan la diferencia en aplicaciones de producción.

Crear publishers propios: el protocolo Publisher

Cualquier tipo puede conformar Publisher implementando dos requisitos: el tipo Output y el tipo Failure, más el método receive(subscriber:). En la práctica, lo más habitual es envolver lógica en un AnyPublisher usando Deferred o el inicializador de Future:

import Combine

// Publisher que emite el contenido de un fichero de forma lazy
func leerFicheroPublisher(url: URL) -> AnyPublisher {
    Deferred {
        Future { promise in
            do {
                let contenido = try String(contentsOf: url, encoding: .utf8)
                promise(.success(contenido))
            } catch {
                promise(.failure(error))
            }
        }
    }
    .eraseToAnyPublisher()
}

Deferred garantiza que el trabajo no empieza hasta que hay un suscriptor, evitando efectos secundarios al construir el publisher. Future encapsula una operación que produce un único valor o falla. La combinación de ambos es la forma estándar para trabajo asíncrono one-shot.

Para publishers completamente personalizados que emiten varios valores, es mejor implementar el protocolo directamente:

struct ContadorPublisher: Publisher {
    typealias Output = Int
    typealias Failure = Never

    let hasta: Int

    func receive(subscriber: S)
        where S.Input == Int, S.Failure == Never {
        let subscription = ContadorSubscription(subscriber: subscriber, hasta: hasta)
        subscriber.receive(subscription: subscription)
    }
}

final class ContadorSubscription: Subscription
    where S.Input == Int, S.Failure == Never {

    private var subscriber: S?
    private var contador = 0
    private let hasta: Int

    init(subscriber: S, hasta: Int) {
        self.subscriber = subscriber
        self.hasta = hasta
    }

    func request(_ demand: Subscribers.Demand) {
        guard let subscriber else { return }
        while contador < hasta {
            _ = subscriber.receive(contador)
            contador += 1
        }
        subscriber.receive(completion: .finished)
    }

    func cancel() { subscriber = nil }
}

// Uso
ContadorPublisher(hasta: 5)
    .sink { print($0) }
// 0, 1, 2, 3, 4

Scheduler: controlar el hilo de ejecución

receive(on:) cambia el hilo en que el subscriber recibe los valores. subscribe(on:) cambia el hilo en que el publisher ejecuta su trabajo. La confusión entre ambos es una fuente frecuente de bugs:

let cancellable = URLSession.shared
    .dataTaskPublisher(for: url)
    // El trabajo de red ocurre en el hilo de URLSession (correcto por defecto)
    .map { $0.data }
    .decode(type: [Item].self, decoder: JSONDecoder())
    // La actualización de UI siempre en el hilo principal
    .receive(on: DispatchQueue.main)
    .sink(
        receiveCompletion: { _ in },
        receiveValue: { self.items = $0 }
    )

Para trabajo pesado en background, subscribe(on:) mueve toda la suscripción a otro scheduler:

Just(datosGrandes)
    .subscribe(on: DispatchQueue.global(qos: .userInitiated))
    .map { procesarDatos($0) }           // Ejecuta en global queue
    .receive(on: DispatchQueue.main)     // UI en hilo principal
    .assign(to: .resultado, on: self)
    .store(in: &cancellables)

Compartir suscripciones: share() y multicast()

Sin share(), cada suscriptor crea una nueva suscripción al publisher upstream, lo que duplica peticiones de red o trabajo costoso:

let peticion = URLSession.shared
    .dataTaskPublisher(for: url)
    .map(.data)
    .share()   // Una sola petición, varios suscriptores

peticion
    .decode(type: [Usuario].self, decoder: JSONDecoder())
    .sink { self.usuarios = $0 }
    .store(in: &cancellables)

peticion
    .map { "($0.count) bytes recibidos" }
    .sink { self.estado = $0 }
    .store(in: &cancellables)

multicast(_:) es más explícito: conecta el upstream a un subject y permite controlar exactamente cuándo empiezan las emisiones con connect():

let subject = PassthroughSubject()
let shared = URLSession.shared
    .dataTaskPublisher(for: url)
    .map(.data)
    .multicast(subject: subject)

shared.sink { self.procesarA($0) }.store(in: &cancellables)
shared.sink { self.procesarB($0) }.store(in: &cancellables)

let connection = shared.connect()
// Ahora empieza la petición de red

Manejo de errores: mapError, catch y retry

mapError convierte el tipo de error del publisher en otro tipo, necesario para unificar pipelines con diferentes fuentes de error:

enum AppError: Error {
    case red(URLError)
    case decodificacion(DecodingError)
    case desconocido(Error)
}

URLSession.shared
    .dataTaskPublisher(for: url)
    .mapError { AppError.red($0) }
    .map(.data)
    .decode(type: Respuesta.self, decoder: JSONDecoder())
    .mapError { error in
        if let decoding = error as? DecodingError {
            return AppError.decodificacion(decoding)
        }
        return AppError.desconocido(error)
    }
    .sink(
        receiveCompletion: { completion in
            if case .failure(let error) = completion {
                print("Error tipado: (error)")
            }
        },
        receiveValue: { self.respuesta = $0 }
    )
    .store(in: &cancellables)

catch permite recuperarse de un error sustituyendo el publisher que falló por otro alternativo:

URLSession.shared
    .dataTaskPublisher(for: urlPrincipal)
    .catch { _ in
        // Si falla, intentar con el servidor de respaldo
        URLSession.shared.dataTaskPublisher(for: urlRespaldo)
    }
    .map(.data)
    .sink(receiveCompletion: { _ in }, receiveValue: { self.datos = $0 })
    .store(in: &cancellables)

retry(_:) reintenta la suscripción upstream un número de veces antes de propagar el error:

URLSession.shared
    .dataTaskPublisher(for: url)
    .retry(3)   // Hasta 3 reintentos antes de fallar
    .map(.data)
    .sink(receiveCompletion: { _ in }, receiveValue: { self.datos = $0 })
    .store(in: &cancellables)

Resumen

El nivel avanzado de Combine se estructura en cuatro pilares: publishers propios con Future, Deferred o el protocolo completo para encapsular lógica; Scheduler con receive(on:) y subscribe(on:) para controlar hilos; share() y multicast() para evitar trabajo duplicado cuando hay varios suscriptores; y mapError, catch y retry para pipelines resilientes. Dominar estos cuatro bloques es suficiente para construir toda la capa de datos de una aplicación iOS con Combine.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP