Canales avanzados en Go: done channel, tee, merge con cierre correcto y timeouts

Los canales básicos de Go ya permiten hacer mucho, pero hay patrones avanzados que se repiten en casi cualquier sistema concurrente real: señalizar cancelación a múltiples goroutines, duplicar streams, fusionar varios canales en uno o imponer un rate limit. Aquí están los principales con ejemplos listos para producción.

Done channel: cancelar varias goroutines con una sola señal

Cerrar un canal desbloquea a todos los receptores al mismo tiempo. Eso lo hace ideal para señalizar cancelación sin usar context:

func procesarConCancel(done <-chan struct{}, items <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case item, ok := <-items:
                if !ok {
                    return
                }
                select {
                case out <- item * 2:
                case <-done:
                    return
                }
            case <-done:
                return
            }
        }
    }()
    return out
}

done := make(chan struct{})
items := make(chan int, 10)

resultados := procesarConCancel(done, items)

// Cancelar todas las goroutines de una vez
close(done)

Cierre correcto: solo el productor cierra el canal

La regla de oro en Go es que solo la goroutine productora debe cerrar el canal. Cerrar un canal desde el consumidor causa panic. Si hay varios productores, usa un sync.WaitGroup para cerrar al terminar todos:

func fusionar(canales ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    reenviar := func(c <-chan int) {
        defer wg.Done()
        for v := range c {
            merged <- v
        }
    }

    wg.Add(len(canales))
    for _, c := range canales {
        go reenviar(c)
    }

    go func() {
        wg.Wait()
        close(merged) // solo se cierra cuando todos los productores terminan
    }()

    return merged
}

Tee channel: duplicar un stream

func tee(in <-chan int) (<-chan int, <-chan int) {
    out1 := make(chan int)
    out2 := make(chan int)
    go func() {
        defer close(out1)
        defer close(out2)
        for v := range in {
            // ambas escrituras deben completarse antes de leer el siguiente valor
            var o1, o2 = out1, out2
            for i := 0; i < 2; i++ {
                select {
                case o1 <- v:
                    o1 = nil // marcar como enviado
                case o2 <- v:
                    o2 = nil
                }
            }
        }
    }()
    return out1, out2
}

// Uso: dos consumidores reciben exactamente los mismos valores
c1, c2 := tee(fuente)
go func() { for v := range c1 { fmt.Println("consumidor 1:", v) } }()
go func() { for v := range c2 { fmt.Println("consumidor 2:", v) } }

Nil channels: pausar selectivamente

Asignar nil a una variable de canal dentro de un select hace que esa rama se ignore permanentemente hasta que se le asigne un canal válido de nuevo. Es un truco útil para pausar la lectura de un canal sin eliminar el case:

func fusionarConPausa(a, b <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for a != nil || b != nil {
            select {
            case v, ok := <-a:
                if !ok {
                    a = nil // ignorar este case de ahora en adelante
                    continue
                }
                out <- v
            case v, ok := <-b:
                if !ok {
                    b = nil
                    continue
                }
                out <- v
            }
        }
    }()
    return out
}

Timeout con time.After

func conTimeout(ch <-chan string, timeout time.Duration) (string, bool) {
    select {
    case v := <-ch:
        return v, true
    case <-time.After(timeout):
        return "", false
    }
}

resultado, ok := conTimeout(resultados, 2*time.Second)
if !ok {
    fmt.Println("timeout esperando resultado")
}

Or-done: wrap para salir antes del cierre

func orDone(done, c <-chan any) <-chan any {
    valStream := make(chan any)
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

Rate limiting con time.Tick

func rateLimitado(requests <-chan int) {
    limiter := time.NewTicker(100 * time.Millisecond) // max 10 req/s
    defer limiter.Stop()

    for req := range requests {
        <-limiter.C
        go procesarRequest(req)
    }
}

// Rate limiting en ráfaga: permite hasta N inmediatas, luego limita
func rateLimitadoConBurst(requests <-chan int) {
    limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 5) // 5 en ráfaga
    for req := range requests {
        limiter.Wait(context.Background())
        go procesarRequest(req)
    }
}

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP