TaskGroup, AsyncSequence y AsyncStream en Swift: concurrencia avanzada y streams async

Cuando las tareas paralelas con async let no bastan porque el número de operaciones es dinámico, o cuando necesitas procesar flujos de datos que llegan de forma asíncrona a lo largo del tiempo, Swift ofrece dos herramientas complementarias: TaskGroup para paralelismo coordinado y AsyncSequence/AsyncStream para flujos de datos asíncronos.

TaskGroup: paralelismo dinámico con resultados recopilados

withTaskGroup permite lanzar un número arbitrario de tareas en paralelo y recopilar sus resultados a medida que van completándose:

func buscarEnVariasFuentes(termino: String) async -> [Resultado] {
    let fuentes: [FuenteBusqueda] = [
        .wikipedia, .stackoverflow, .github, .docs
    ]

    return await withTaskGroup(of: [Resultado].self) { group in
        for fuente in fuentes {
            group.addTask {
                await fuente.buscar(termino)
            }
        }

        var todosLosResultados: [Resultado] = []
        for await resultados in group {
            todosLosResultados.append(contentsOf: resultados)
        }
        return todosLosResultados
    }
}

Los resultados llegan en el orden en que las tareas finalizan, no necesariamente el orden en que se lanzaron. Si necesitas preservar el orden, debes incluir el índice en el resultado:

func procesarOrdenado(items: [T], proceso: (T) async -> String) async -> [String] {
    await withTaskGroup(of: (Int, String).self) { group in
        for (idx, item) in items.enumerated() {
            group.addTask {
                let resultado = await proceso(item)
                return (idx, resultado)
            }
        }

        var resultados = Array(repeating: "", count: items.count)
        for await (idx, valor) in group {
            resultados[idx] = valor
        }
        return resultados
    }
}

withThrowingTaskGroup: grupos que pueden fallar

Cuando las tareas del grupo pueden lanzar errores, usa withThrowingTaskGroup. Si cualquier tarea falla, el grupo cancela el resto y propaga el error:

func descargarTodos(urls: [URL]) async throws -> [Data] {
    try await withThrowingTaskGroup(of: (Int, Data).self) { group in
        for (idx, url) in urls.enumerated() {
            group.addTask {
                let (data, respuesta) = try await URLSession.shared.data(from: url)
                guard let http = respuesta as? HTTPURLResponse,
                      http.statusCode == 200 else {
                    throw DescargaError.respuestaInvalida
                }
                return (idx, data)
            }
        }

        var resultados = Array(repeating: Data(), count: urls.count)
        for try await (idx, data) in group {
            resultados[idx] = data
        }
        return resultados
    }
}

Limitar el número de tareas concurrentes

Lanzar cientos de tareas a la vez puede saturar la red o el servidor. Para limitar la concurrencia máxima, usa un semáforo de actor o un enfoque por lotes:

func procesarConLimite(
    items: [T],
    limite: Int,
    proceso: (T) async throws -> String
) async throws -> [String] {
    try await withThrowingTaskGroup(of: (Int, String).self) { group in
        var resultados = Array(repeating: "", count: items.count)
        var indiceActual = 0

        // Lanza las primeras 'limite' tareas
        while indiceActual < min(limite, items.count) {
            let idx = indiceActual
            group.addTask { (idx, try await proceso(items[idx])) }
            indiceActual += 1
        }

        // Por cada tarea completada, lanza la siguiente
        for try await (idx, valor) in group {
            resultados[idx] = valor
            if indiceActual < items.count {
                let nextIdx = indiceActual
                group.addTask { (nextIdx, try await proceso(items[nextIdx])) }
                indiceActual += 1
            }
        }

        return resultados
    }
}

AsyncSequence: iteración asíncrona

AsyncSequence es el equivalente asíncrono de Sequence: un tipo que produce sus elementos uno a uno, pero potencialmente con esperas entre elementos. Se itera con for await:

// URLSession.bytes retorna un AsyncSequence de bytes
let (bytes, _) = try await URLSession.shared.bytes(from: url)
var lineas: [String] = []
var lineaActual = ""

for try await byte in bytes {
    if byte == UInt8(ascii: "n") {
        lineas.append(lineaActual)
        lineaActual = ""
    } else {
        lineaActual.append(Character(UnicodeScalar(byte)))
    }
}

La librería estándar incluye AsyncSequence en varios lugares: URLSession.bytes, lectura línea a línea con FileHandle.bytes, y cualquier tipo que implemente el protocolo.

Operadores sobre AsyncSequence

Puedes usar map, filter, compactMap, prefix y reduce directamente sobre secuencias asíncronas:

let (bytes, respuesta) = try await URLSession.shared.bytes(from: apiURL)
let lineas = bytes.lines // AsyncSequence de String (líneas de texto)

let resultados = try await lineas
    .compactMap { linea -> Usuario? in
        guard !linea.isEmpty else { return nil }
        return try? JSONDecoder().decode(Usuario.self, from: Data(linea.utf8))
    }
    .prefix(100) // Solo los primeros 100

for try await usuario in resultados {
    await guardar(usuario)
}

AsyncStream: crear tu propia secuencia asíncrona

AsyncStream es la forma más sencilla de crear una AsyncSequence propia. Es especialmente útil para adaptar APIs de callbacks o delegados al modelo async/await:

func notificacionesDeUbicacion() -> AsyncStream {
    AsyncStream { continuation in
        let manager = CLLocationManager()
        let delegado = DelegadoUbicacion { ubicacion in
            continuation.yield(ubicacion)
        }
        manager.delegate = delegado
        manager.startUpdatingLocation()

        continuation.onTermination = { _ in
            manager.stopUpdatingLocation()
        }
    }
}

// Uso
for await ubicacion in notificacionesDeUbicacion() {
    await actualizarMapa(con: ubicacion)
}

continuation.yield(_:) emite un elemento. continuation.finish()` termina el stream. onTermination se llama cuando el consumidor cancela la iteración, permitiendo limpiar recursos.

AsyncThrowingStream: streams que pueden fallar

Cuando el productor puede encontrar errores, usa AsyncThrowingStream:

func leerLogEnTiempoReal(archivo: URL) -> AsyncThrowingStream {
    AsyncThrowingStream { continuation in
        let fuente = DispatchSource.makeFileSystemObjectSource(
            fileDescriptor: try! FileHandle(forReadingFrom: archivo).fileDescriptor,
            eventMask: .write
        )

        fuente.setEventHandler {
            // Leer nuevas líneas del archivo
            if let linea = leerNuevasLineas() {
                continuation.yield(linea)
            }
        }

        fuente.setCancelHandler {
            continuation.finish()
        }

        fuente.resume()

        continuation.onTermination = { _ in
            fuente.cancel()
        }
    }
}

Ejemplo completo: descarga con progreso en tiempo real

struct ProgresoDescarga {
    let bytesDescargados: Int64
    let bytesTotales: Int64
    var porcentaje: Double { Double(bytesDescargados) / Double(bytesTotales) }
}

func descargarConProgreso(url: URL) -> AsyncThrowingStream {
    AsyncThrowingStream { continuation in
        Task {
            do {
                let (bytes, respuesta) = try await URLSession.shared.bytes(from: url)
                let total = respuesta.expectedContentLength
                var descargados: Int64 = 0

                for try await _ in bytes {
                    descargados += 1
                    if descargados % 1024 == 0 { // Actualizar cada KB
                        continuation.yield(ProgresoDescarga(
                            bytesDescargados: descargados,
                            bytesTotales: total
                        ))
                    }
                }
                continuation.finish()
            } catch {
                continuation.finish(throwing: error)
            }
        }
    }
}

// Uso en SwiftUI
for try await progreso in descargarConProgreso(url: archivoURL) {
    await MainActor.run {
        barraProgreso.progress = Float(progreso.porcentaje)
    }
}

Implementar AsyncSequence desde cero

Para casos más sofisticados, puedes implementar el protocolo AsyncSequence directamente:

struct SecuenciaFibonacci: AsyncSequence {
    typealias Element = Int
    let limite: Int

    struct AsyncIterator: AsyncIteratorProtocol {
        var a = 0, b = 1
        var contador = 0
        let limite: Int

        mutating func next() async -> Int? {
            guard contador < limite else { return nil }
            // Simula trabajo asíncrono entre elementos
            try? await Task.sleep(for: .milliseconds(100))
            let resultado = a
            (a, b) = (b, a + b)
            contador += 1
            return resultado
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        AsyncIterator(limite: limite)
    }
}

for await numero in SecuenciaFibonacci(limite: 10) {
    print(numero)
}

Resumen

TaskGroup resuelve el paralelismo dinámico de forma segura y con cancelación automática. AsyncSequence convierte la iteración sobre datos asíncronos en un for await tan legible como un bucle normal. AsyncStream y AsyncThrowingStream son el puente que conecta APIs antiguas de callbacks y delegados con el mundo moderno de la concurrencia estructurada. Juntos cubren prácticamente todos los patrones de concurrencia que aparecen en aplicaciones reales.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP