Patrones de concurrencia en Go: worker pool, fan-out/fan-in y pipeline

Go facilita la concurrencia con goroutines y canales, pero sin patrones claros los programas se vuelven difíciles de razonar. Tres patrones cubren la mayoría de situaciones reales: worker pool para limitar goroutines activas, fan-out/fan-in para distribuir trabajo y recoger resultados, y pipeline para encadenar etapas de transformación.

Worker pool: limitar la concurrencia

Un worker pool lanza un número fijo de goroutines que consumen trabajos de un canal compartido. Evita saturar la CPU o los recursos externos con miles de goroutines simultáneas:

package main

import (
    "fmt"
    "sync"
)

func workerPool(numWorkers int, jobs <-chan int, results chan<- int) {
    var wg sync.WaitGroup
    for w := 0; w < numWorkers; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := range jobs {
                results <- j * j // trabajo: calcular el cuadrado
            }
        }()
    }
    go func() {
        wg.Wait()
        close(results)
    }()
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    go workerPool(5, jobs, results)

    for i := 1; i <= 20; i++ {
        jobs <- i
    }
    close(jobs)

    for r := range results {
        fmt.Println(r)
    }
}

El truco es cerrar jobs para que los workers salgan del range y llamar a close(results) solo después de que todos terminen, con un sync.WaitGroup en una goroutine separada.

Fan-out/fan-in: distribuir y recolectar

Fan-out toma un canal de entrada y distribuye su trabajo entre varias goroutines. Fan-in combina varios canales de salida en uno solo para que el consumidor solo necesite escuchar en un sitio:

func fanOut(entrada <-chan int, n int) []<-chan int {
    salidas := make([]<-chan int, n)
    for i := 0; i < n; i++ {
        ch := make(chan int)
        salidas[i] = ch
        go func(out chan<- int) {
            for v := range entrada {
                out <- v * 2
            }
            close(out)
        }(ch)
    }
    return salidas
}

func fanIn(canales ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)
    reenviar := func(c <-chan int) {
        defer wg.Done()
        for v := range c {
            merged <- v
        }
    }
    wg.Add(len(canales))
    for _, c := range canales {
        go reenviar(c)
    }
    go func() {
        wg.Wait()
        close(merged)
    }()
    return merged
}

Pipeline: encadenar etapas

Un pipeline conecta etapas donde cada una lee de un canal, transforma el dato y escribe en el siguiente. Cada etapa es una función que devuelve un canal:

func generar(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func cuadrado(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func filtrar(in <-chan int, minimo int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            if n >= minimo {
                out <- n
            }
        }
        close(out)
    }()
    return out
}

func main() {
    // Pipeline: generar ? cuadrado ? filtrar(?25)
    c := generar(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    c = cuadrado(c)
    c = filtrar(c, 25)

    for v := range c {
        fmt.Println(v) // 25, 36, 49, 64, 81, 100
    }
}

Error típico: goroutine leak por canal no cerrado

Si el consumidor abandona un pipeline antes de que la fuente termine, las goroutines productoras quedan bloqueadas enviando a un canal que nadie lee. La solución es un canal done que señaliza la cancelación:

func generarConCancel(done <-chan struct{}, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <-done: // consumidor ha cancelado
                return
            }
        }
    }()
    return out
}

// Al terminar de consumir, cierra done:
done := make(chan struct{})
defer close(done)
c := generarConCancel(done, 1, 2, 3, 4, 5)
fmt.Println(<-c) // consume solo el primero
// Al salir, defer close(done) desbloquea la goroutine productora

Para pipelines en producción, sustituye el canal done por un context.Context y usa ctx.Done(): obtendrás cancelación con timeout y propagación entre goroutines sin coste adicional.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP