Combine tiene un segundo nivel de complejidad que va más allá de los operadores básicos. Crear publishers propios, controlar en qué hilo se ejecuta el trabajo, compartir suscripciones costosas y manejar errores de forma elegante son las habilidades que marcan la diferencia en aplicaciones de producción.
Crear publishers propios: el protocolo Publisher
Cualquier tipo puede conformar Publisher implementando dos requisitos: el tipo Output y el tipo Failure, más el método receive(subscriber:). En la práctica, lo más habitual es envolver lógica en un AnyPublisher usando Deferred o el inicializador de Future:
import Combine
// Publisher que emite el contenido de un fichero de forma lazy
func leerFicheroPublisher(url: URL) -> AnyPublisher {
Deferred {
Future { promise in
do {
let contenido = try String(contentsOf: url, encoding: .utf8)
promise(.success(contenido))
} catch {
promise(.failure(error))
}
}
}
.eraseToAnyPublisher()
}
Deferred garantiza que el trabajo no empieza hasta que hay un suscriptor, evitando efectos secundarios al construir el publisher. Future encapsula una operación que produce un único valor o falla. La combinación de ambos es la forma estándar para trabajo asíncrono one-shot.
Para publishers completamente personalizados que emiten varios valores, es mejor implementar el protocolo directamente:
struct ContadorPublisher: Publisher {
typealias Output = Int
typealias Failure = Never
let hasta: Int
func receive(subscriber: S)
where S.Input == Int, S.Failure == Never {
let subscription = ContadorSubscription(subscriber: subscriber, hasta: hasta)
subscriber.receive(subscription: subscription)
}
}
final class ContadorSubscription: Subscription
where S.Input == Int, S.Failure == Never {
private var subscriber: S?
private var contador = 0
private let hasta: Int
init(subscriber: S, hasta: Int) {
self.subscriber = subscriber
self.hasta = hasta
}
func request(_ demand: Subscribers.Demand) {
guard let subscriber else { return }
while contador < hasta {
_ = subscriber.receive(contador)
contador += 1
}
subscriber.receive(completion: .finished)
}
func cancel() { subscriber = nil }
}
// Uso
ContadorPublisher(hasta: 5)
.sink { print($0) }
// 0, 1, 2, 3, 4
Scheduler: controlar el hilo de ejecución
receive(on:) cambia el hilo en que el subscriber recibe los valores. subscribe(on:) cambia el hilo en que el publisher ejecuta su trabajo. La confusión entre ambos es una fuente frecuente de bugs:
let cancellable = URLSession.shared
.dataTaskPublisher(for: url)
// El trabajo de red ocurre en el hilo de URLSession (correcto por defecto)
.map { $0.data }
.decode(type: [Item].self, decoder: JSONDecoder())
// La actualización de UI siempre en el hilo principal
.receive(on: DispatchQueue.main)
.sink(
receiveCompletion: { _ in },
receiveValue: { self.items = $0 }
)
Para trabajo pesado en background, subscribe(on:) mueve toda la suscripción a otro scheduler:
Just(datosGrandes)
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.map { procesarDatos($0) } // Ejecuta en global queue
.receive(on: DispatchQueue.main) // UI en hilo principal
.assign(to: .resultado, on: self)
.store(in: &cancellables)
Compartir suscripciones: share() y multicast()
Sin share(), cada suscriptor crea una nueva suscripción al publisher upstream, lo que duplica peticiones de red o trabajo costoso:
let peticion = URLSession.shared
.dataTaskPublisher(for: url)
.map(.data)
.share() // Una sola petición, varios suscriptores
peticion
.decode(type: [Usuario].self, decoder: JSONDecoder())
.sink { self.usuarios = $0 }
.store(in: &cancellables)
peticion
.map { "($0.count) bytes recibidos" }
.sink { self.estado = $0 }
.store(in: &cancellables)
multicast(_:) es más explícito: conecta el upstream a un subject y permite controlar exactamente cuándo empiezan las emisiones con connect():
let subject = PassthroughSubject()
let shared = URLSession.shared
.dataTaskPublisher(for: url)
.map(.data)
.multicast(subject: subject)
shared.sink { self.procesarA($0) }.store(in: &cancellables)
shared.sink { self.procesarB($0) }.store(in: &cancellables)
let connection = shared.connect()
// Ahora empieza la petición de red
Manejo de errores: mapError, catch y retry
mapError convierte el tipo de error del publisher en otro tipo, necesario para unificar pipelines con diferentes fuentes de error:
enum AppError: Error {
case red(URLError)
case decodificacion(DecodingError)
case desconocido(Error)
}
URLSession.shared
.dataTaskPublisher(for: url)
.mapError { AppError.red($0) }
.map(.data)
.decode(type: Respuesta.self, decoder: JSONDecoder())
.mapError { error in
if let decoding = error as? DecodingError {
return AppError.decodificacion(decoding)
}
return AppError.desconocido(error)
}
.sink(
receiveCompletion: { completion in
if case .failure(let error) = completion {
print("Error tipado: (error)")
}
},
receiveValue: { self.respuesta = $0 }
)
.store(in: &cancellables)
catch permite recuperarse de un error sustituyendo el publisher que falló por otro alternativo:
URLSession.shared
.dataTaskPublisher(for: urlPrincipal)
.catch { _ in
// Si falla, intentar con el servidor de respaldo
URLSession.shared.dataTaskPublisher(for: urlRespaldo)
}
.map(.data)
.sink(receiveCompletion: { _ in }, receiveValue: { self.datos = $0 })
.store(in: &cancellables)
retry(_:) reintenta la suscripción upstream un número de veces antes de propagar el error:
URLSession.shared
.dataTaskPublisher(for: url)
.retry(3) // Hasta 3 reintentos antes de fallar
.map(.data)
.sink(receiveCompletion: { _ in }, receiveValue: { self.datos = $0 })
.store(in: &cancellables)
Resumen
El nivel avanzado de Combine se estructura en cuatro pilares: publishers propios con Future, Deferred o el protocolo completo para encapsular lógica; Scheduler con receive(on:) y subscribe(on:) para controlar hilos; share() y multicast() para evitar trabajo duplicado cuando hay varios suscriptores; y mapError, catch y retry para pipelines resilientes. Dominar estos cuatro bloques es suficiente para construir toda la capa de datos de una aplicación iOS con Combine.
