Go facilita la concurrencia con goroutines y canales, pero sin patrones claros los programas se vuelven difíciles de razonar. Tres patrones cubren la mayoría de situaciones reales: worker pool para limitar goroutines activas, fan-out/fan-in para distribuir trabajo y recoger resultados, y pipeline para encadenar etapas de transformación.
Worker pool: limitar la concurrencia
Un worker pool lanza un número fijo de goroutines que consumen trabajos de un canal compartido. Evita saturar la CPU o los recursos externos con miles de goroutines simultáneas:
package main
import (
"fmt"
"sync"
)
func workerPool(numWorkers int, jobs <-chan int, results chan<- int) {
var wg sync.WaitGroup
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := range jobs {
results <- j * j // trabajo: calcular el cuadrado
}
}()
}
go func() {
wg.Wait()
close(results)
}()
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
go workerPool(5, jobs, results)
for i := 1; i <= 20; i++ {
jobs <- i
}
close(jobs)
for r := range results {
fmt.Println(r)
}
}
El truco es cerrar jobs para que los workers salgan del range y llamar a close(results) solo después de que todos terminen, con un sync.WaitGroup en una goroutine separada.
Fan-out/fan-in: distribuir y recolectar
Fan-out toma un canal de entrada y distribuye su trabajo entre varias goroutines. Fan-in combina varios canales de salida en uno solo para que el consumidor solo necesite escuchar en un sitio:
func fanOut(entrada <-chan int, n int) []<-chan int {
salidas := make([]<-chan int, n)
for i := 0; i < n; i++ {
ch := make(chan int)
salidas[i] = ch
go func(out chan<- int) {
for v := range entrada {
out <- v * 2
}
close(out)
}(ch)
}
return salidas
}
func fanIn(canales ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
reenviar := func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}
wg.Add(len(canales))
for _, c := range canales {
go reenviar(c)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
Pipeline: encadenar etapas
Un pipeline conecta etapas donde cada una lee de un canal, transforma el dato y escribe en el siguiente. Cada etapa es una función que devuelve un canal:
func generar(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func cuadrado(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func filtrar(in <-chan int, minimo int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if n >= minimo {
out <- n
}
}
close(out)
}()
return out
}
func main() {
// Pipeline: generar ? cuadrado ? filtrar(?25)
c := generar(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
c = cuadrado(c)
c = filtrar(c, 25)
for v := range c {
fmt.Println(v) // 25, 36, 49, 64, 81, 100
}
}
Error típico: goroutine leak por canal no cerrado
Si el consumidor abandona un pipeline antes de que la fuente termine, las goroutines productoras quedan bloqueadas enviando a un canal que nadie lee. La solución es un canal done que señaliza la cancelación:
func generarConCancel(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done: // consumidor ha cancelado
return
}
}
}()
return out
}
// Al terminar de consumir, cierra done:
done := make(chan struct{})
defer close(done)
c := generarConCancel(done, 1, 2, 3, 4, 5)
fmt.Println(<-c) // consume solo el primero
// Al salir, defer close(done) desbloquea la goroutine productora
Para pipelines en producción, sustituye el canal done por un context.Context y usa ctx.Done(): obtendrás cancelación con timeout y propagación entre goroutines sin coste adicional.
