golang.org/x/sync en Go: errgroup, semaphore y singleflight para concurrencia avanzada

El módulo golang.org/x/sync amplía la librería estándar con primitivas de concurrencia que cubren tres situaciones muy comunes: esperar varias goroutines con propagación de errores, limitar la concurrencia a un número fijo de goroutines simultáneas y colapsar peticiones duplicadas en vuelo.

errgroup: goroutines con propagación de errores

sync.WaitGroup espera goroutines pero no propaga errores. errgroup resuelve eso y además cancela el contexto cuando la primera goroutine falla:

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func obtenerURLs(urls []string) error {
    g, ctx := errgroup.WithContext(context.Background())

    for _, url := range urls {
        url := url // captura de variable de bucle
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return fmt.Errorf("crear request %s: %w", url, err)
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()
            if resp.StatusCode != http.StatusOK {
                return fmt.Errorf("%s devolvió %d", url, resp.StatusCode)
            }
            fmt.Println("OK:", url)
            return nil
        })
    }

    return g.Wait() // nil si todas OK, primer error si alguna falla
}

func main() {
    err := obtenerURLs([]string{
        "https://go.dev",
        "https://pkg.go.dev",
        "https://blog.golang.org",
    })
    if err != nil {
        fmt.Println("Error:", err)
    }
}

Si quieres limitar el número de goroutines simultáneas, usa g.SetLimit(n) antes de los g.Go. Con el límite puesto, g.Go bloquea hasta que haya un slot libre.

semaphore.Weighted: limitar la concurrencia con pesos

semaphore.Weighted es más flexible que el semáforo clásico porque permite adquirir pesos variables. Útil para limitar recursos que no son uniformes (por ejemplo, el tamaño de los ficheros que procesas en paralelo):

import "golang.org/x/sync/semaphore"

const maxConcurrente = 5

func procesarFicheros(ficheros []string) error {
    sem := semaphore.NewWeighted(maxConcurrente)
    ctx := context.Background()
    g := new(errgroup.Group)

    for _, f := range ficheros {
        f := f
        if err := sem.Acquire(ctx, 1); err != nil {
            return err
        }
        g.Go(func() error {
            defer sem.Release(1)
            return procesarFichero(f)
        })
    }
    return g.Wait()
}

// Con pesos: ficheros grandes consumen más slots
func procesarConPeso(fichero string, tamanoMB int64) error {
    sem := semaphore.NewWeighted(100) // 100 MB total
    if err := sem.Acquire(ctx, tamanoMB); err != nil {
        return err
    }
    defer sem.Release(tamanoMB)
    return procesarFichero(fichero)
}

singleflight: evitar peticiones duplicadas bajo carga

singleflight colapsa llamadas concurrentes con la misma clave: si dos goroutines piden el mismo dato al mismo tiempo, solo una hace la petición real y la otra espera y recibe el mismo resultado. Ideal para caché con thundering herd:

import "golang.org/x/sync/singleflight"

var grupo singleflight.Group

func obtenerDatosUsuario(ctx context.Context, userID string) (*Usuario, error) {
    v, err, _ := grupo.Do("usuario:"+userID, func() (any, error) {
        // esta función solo se ejecuta una vez aunque haya 100
        // goroutines pidiendo el mismo userID al mismo tiempo
        return consultarBBDD(ctx, userID)
    })
    if err != nil {
        return nil, err
    }
    return v.(*Usuario), nil
}

// El tercer retorno (shared bool) indica si el resultado fue
// compartido con otras goroutines que llamaron con la misma clave

El antipatrón más común con singleflight es usarlo sin caché: si las peticiones no llegan en la misma ventana temporal, cada una generará una llamada nueva. singleflight colapsa peticiones en vuelo, no cachea el resultado. Para un caché completo combínalo con sync.Map o una caché con TTL:

var (
    cache sync.Map
    grupo singleflight.Group
)

func obtenerConCache(key string) (string, error) {
    // 1. Primero la caché local
    if v, ok := cache.Load(key); ok {
        return v.(string), nil
    }
    // 2. Si no hay caché, colapsar peticiones duplicadas
    v, err, _ := grupo.Do(key, func() (any, error) {
        resultado, err := obtenerDeAPI(key)
        if err == nil {
            cache.Store(key, resultado)
        }
        return resultado, err
    })
    if err != nil {
        return "", err
    }
    return v.(string), nil
}

Antipatrones y errores frecuentes

Con errgroup, el error cancelado del contexto puede enmascarar el error real. Siempre comprueba el error de g.Wait(), no el del contexto:

// MAL: el contexto cancelado devuelve context.Canceled en lugar del error real
if ctx.Err() != nil {
    return ctx.Err()
}

// BIEN: el error de g.Wait() es el primero real que ocurrió
if err := g.Wait(); err != nil {
    return err
}

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP