Cuando las tareas paralelas con async let no bastan porque el número de operaciones es dinámico, o cuando necesitas procesar flujos de datos que llegan de forma asíncrona a lo largo del tiempo, Swift ofrece dos herramientas complementarias: TaskGroup para paralelismo coordinado y AsyncSequence/AsyncStream para flujos de datos asíncronos.
TaskGroup: paralelismo dinámico con resultados recopilados
withTaskGroup permite lanzar un número arbitrario de tareas en paralelo y recopilar sus resultados a medida que van completándose:
func buscarEnVariasFuentes(termino: String) async -> [Resultado] {
let fuentes: [FuenteBusqueda] = [
.wikipedia, .stackoverflow, .github, .docs
]
return await withTaskGroup(of: [Resultado].self) { group in
for fuente in fuentes {
group.addTask {
await fuente.buscar(termino)
}
}
var todosLosResultados: [Resultado] = []
for await resultados in group {
todosLosResultados.append(contentsOf: resultados)
}
return todosLosResultados
}
}
Los resultados llegan en el orden en que las tareas finalizan, no necesariamente el orden en que se lanzaron. Si necesitas preservar el orden, debes incluir el índice en el resultado:
func procesarOrdenado(items: [T], proceso: (T) async -> String) async -> [String] {
await withTaskGroup(of: (Int, String).self) { group in
for (idx, item) in items.enumerated() {
group.addTask {
let resultado = await proceso(item)
return (idx, resultado)
}
}
var resultados = Array(repeating: "", count: items.count)
for await (idx, valor) in group {
resultados[idx] = valor
}
return resultados
}
}
withThrowingTaskGroup: grupos que pueden fallar
Cuando las tareas del grupo pueden lanzar errores, usa withThrowingTaskGroup. Si cualquier tarea falla, el grupo cancela el resto y propaga el error:
func descargarTodos(urls: [URL]) async throws -> [Data] {
try await withThrowingTaskGroup(of: (Int, Data).self) { group in
for (idx, url) in urls.enumerated() {
group.addTask {
let (data, respuesta) = try await URLSession.shared.data(from: url)
guard let http = respuesta as? HTTPURLResponse,
http.statusCode == 200 else {
throw DescargaError.respuestaInvalida
}
return (idx, data)
}
}
var resultados = Array(repeating: Data(), count: urls.count)
for try await (idx, data) in group {
resultados[idx] = data
}
return resultados
}
}
Limitar el número de tareas concurrentes
Lanzar cientos de tareas a la vez puede saturar la red o el servidor. Para limitar la concurrencia máxima, usa un semáforo de actor o un enfoque por lotes:
func procesarConLimite(
items: [T],
limite: Int,
proceso: (T) async throws -> String
) async throws -> [String] {
try await withThrowingTaskGroup(of: (Int, String).self) { group in
var resultados = Array(repeating: "", count: items.count)
var indiceActual = 0
// Lanza las primeras 'limite' tareas
while indiceActual < min(limite, items.count) {
let idx = indiceActual
group.addTask { (idx, try await proceso(items[idx])) }
indiceActual += 1
}
// Por cada tarea completada, lanza la siguiente
for try await (idx, valor) in group {
resultados[idx] = valor
if indiceActual < items.count {
let nextIdx = indiceActual
group.addTask { (nextIdx, try await proceso(items[nextIdx])) }
indiceActual += 1
}
}
return resultados
}
}
AsyncSequence: iteración asíncrona
AsyncSequence es el equivalente asíncrono de Sequence: un tipo que produce sus elementos uno a uno, pero potencialmente con esperas entre elementos. Se itera con for await:
// URLSession.bytes retorna un AsyncSequence de bytes
let (bytes, _) = try await URLSession.shared.bytes(from: url)
var lineas: [String] = []
var lineaActual = ""
for try await byte in bytes {
if byte == UInt8(ascii: "n") {
lineas.append(lineaActual)
lineaActual = ""
} else {
lineaActual.append(Character(UnicodeScalar(byte)))
}
}
La librería estándar incluye AsyncSequence en varios lugares: URLSession.bytes, lectura línea a línea con FileHandle.bytes, y cualquier tipo que implemente el protocolo.
Operadores sobre AsyncSequence
Puedes usar map, filter, compactMap, prefix y reduce directamente sobre secuencias asíncronas:
let (bytes, respuesta) = try await URLSession.shared.bytes(from: apiURL)
let lineas = bytes.lines // AsyncSequence de String (líneas de texto)
let resultados = try await lineas
.compactMap { linea -> Usuario? in
guard !linea.isEmpty else { return nil }
return try? JSONDecoder().decode(Usuario.self, from: Data(linea.utf8))
}
.prefix(100) // Solo los primeros 100
for try await usuario in resultados {
await guardar(usuario)
}
AsyncStream: crear tu propia secuencia asíncrona
AsyncStream es la forma más sencilla de crear una AsyncSequence propia. Es especialmente útil para adaptar APIs de callbacks o delegados al modelo async/await:
func notificacionesDeUbicacion() -> AsyncStream {
AsyncStream { continuation in
let manager = CLLocationManager()
let delegado = DelegadoUbicacion { ubicacion in
continuation.yield(ubicacion)
}
manager.delegate = delegado
manager.startUpdatingLocation()
continuation.onTermination = { _ in
manager.stopUpdatingLocation()
}
}
}
// Uso
for await ubicacion in notificacionesDeUbicacion() {
await actualizarMapa(con: ubicacion)
}
continuation.yield(_:) emite un elemento. continuation.finish()` termina el stream. onTermination se llama cuando el consumidor cancela la iteración, permitiendo limpiar recursos.
AsyncThrowingStream: streams que pueden fallar
Cuando el productor puede encontrar errores, usa AsyncThrowingStream:
func leerLogEnTiempoReal(archivo: URL) -> AsyncThrowingStream {
AsyncThrowingStream { continuation in
let fuente = DispatchSource.makeFileSystemObjectSource(
fileDescriptor: try! FileHandle(forReadingFrom: archivo).fileDescriptor,
eventMask: .write
)
fuente.setEventHandler {
// Leer nuevas líneas del archivo
if let linea = leerNuevasLineas() {
continuation.yield(linea)
}
}
fuente.setCancelHandler {
continuation.finish()
}
fuente.resume()
continuation.onTermination = { _ in
fuente.cancel()
}
}
}
Ejemplo completo: descarga con progreso en tiempo real
struct ProgresoDescarga {
let bytesDescargados: Int64
let bytesTotales: Int64
var porcentaje: Double { Double(bytesDescargados) / Double(bytesTotales) }
}
func descargarConProgreso(url: URL) -> AsyncThrowingStream {
AsyncThrowingStream { continuation in
Task {
do {
let (bytes, respuesta) = try await URLSession.shared.bytes(from: url)
let total = respuesta.expectedContentLength
var descargados: Int64 = 0
for try await _ in bytes {
descargados += 1
if descargados % 1024 == 0 { // Actualizar cada KB
continuation.yield(ProgresoDescarga(
bytesDescargados: descargados,
bytesTotales: total
))
}
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
}
}
// Uso en SwiftUI
for try await progreso in descargarConProgreso(url: archivoURL) {
await MainActor.run {
barraProgreso.progress = Float(progreso.porcentaje)
}
}
Implementar AsyncSequence desde cero
Para casos más sofisticados, puedes implementar el protocolo AsyncSequence directamente:
struct SecuenciaFibonacci: AsyncSequence {
typealias Element = Int
let limite: Int
struct AsyncIterator: AsyncIteratorProtocol {
var a = 0, b = 1
var contador = 0
let limite: Int
mutating func next() async -> Int? {
guard contador < limite else { return nil }
// Simula trabajo asíncrono entre elementos
try? await Task.sleep(for: .milliseconds(100))
let resultado = a
(a, b) = (b, a + b)
contador += 1
return resultado
}
}
func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(limite: limite)
}
}
for await numero in SecuenciaFibonacci(limite: 10) {
print(numero)
}
Resumen
TaskGroup resuelve el paralelismo dinámico de forma segura y con cancelación automática. AsyncSequence convierte la iteración sobre datos asíncronos en un for await tan legible como un bucle normal. AsyncStream y AsyncThrowingStream son el puente que conecta APIs antiguas de callbacks y delegados con el mundo moderno de la concurrencia estructurada. Juntos cubren prácticamente todos los patrones de concurrencia que aparecen en aplicaciones reales.
