go-redis en Go: comandos, pipelines, Pub/Sub, Streams y conexión con context

go-redis es el cliente Redis oficial para Go. Su punto fuerte es el uso de context.Context en todas las operaciones, lo que te da timeouts y cancelación de forma nativa, y el soporte completo de las funcionalidades modernas de Redis: pipelines, transacciones, Pub/Sub y Redis Streams.

Conexión y comandos básicos

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/redis/go-redis/v9"
)

var rdb *redis.Client

func init() {
    rdb = redis.NewClient(&redis.Options{
        Addr:         "localhost:6379",
        Password:     "",
        DB:           0,
        PoolSize:     10,
        MinIdleConns: 3,
        DialTimeout:  5 * time.Second,
        ReadTimeout:  3 * time.Second,
    })
}

func main() {
    ctx := context.Background()

    // Ping para verificar la conexión
    if err := rdb.Ping(ctx).Err(); err != nil {
        panic(err)
    }

    // SET con TTL
    if err := rdb.Set(ctx, "usuario:1:nombre", "Ana García", 24*time.Hour).Err(); err != nil {
        fmt.Println("Error SET:", err)
    }

    // GET
    val, err := rdb.Get(ctx, "usuario:1:nombre").Result()
    if err == redis.Nil {
        fmt.Println("clave no encontrada")
    } else if err != nil {
        fmt.Println("Error GET:", err)
    } else {
        fmt.Println("Valor:", val)
    }

    // INCR atómico para contadores
    visitas, _ := rdb.Incr(ctx, "pagina:home:visitas").Result()
    fmt.Println("Visitas:", visitas)
}

Pipelines: agrupar comandos en una sola ida/vuelta

pipe := rdb.Pipeline()
ctx := context.Background()

// Encolar comandos sin ejecutarlos todavía
incr := pipe.Incr(ctx, "contador:a")
pipe.Expire(ctx, "contador:a", time.Hour)
get := pipe.Get(ctx, "clave:existente")

// Ejecutar todos de una vez
_, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
    fmt.Println("Error pipeline:", err)
}

fmt.Println("Contador:", incr.Val())
fmt.Println("Clave:", get.Val())

TxPipeline: transacciones atómicas con MULTI/EXEC

// TxPipeline envuelve los comandos en MULTI/EXEC: o todos o ninguno
_, err := rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
    pipe.Set(ctx, "cuenta:saldo", 1000, 0)
    pipe.DecrBy(ctx, "cuenta:saldo", 200)
    pipe.IncrBy(ctx, "cuenta:destino", 200)
    return nil
})
if err != nil {
    fmt.Println("Transacción fallida:", err)
}

Pub/Sub: mensajes en tiempo real

// Suscriptor
func suscribirse(canal string) {
    sub := rdb.Subscribe(context.Background(), canal)
    defer sub.Close()

    ch := sub.Channel()
    for msg := range ch {
        fmt.Printf("[%s] %sn", msg.Channel, msg.Payload)
    }
}

// Publicador
func publicar(canal, mensaje string) error {
    return rdb.Publish(context.Background(), canal, mensaje).Err()
}

// En tu main:
go suscribirse("notificaciones")
publicar("notificaciones", "nuevo usuario registrado: Ana")

Redis Streams: colas persistentes

ctx := context.Background()

// Añadir mensaje al stream
id, err := rdb.XAdd(ctx, &redis.XAddArgs{
    Stream: "eventos",
    Values: map[string]any{
        "tipo":    "compra",
        "usuario": "user-123",
        "importe": "49.99",
    },
}).Result()
fmt.Println("ID del mensaje:", id)

// Leer mensajes desde el principio
msgs, err := rdb.XRead(ctx, &redis.XReadArgs{
    Streams: []string{"eventos", "0"},
    Count:   10,
    Block:   0,
}).Result()
for _, stream := range msgs {
    for _, msg := range stream.Messages {
        fmt.Printf("ID: %s | Datos: %vn", msg.ID, msg.Values)
    }
}

// Grupos de consumo para procesamiento distribuido
rdb.XGroupCreate(ctx, "eventos", "procesadores", "0")
mensajes, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
    Group:    "procesadores",
    Consumer: "worker-1",
    Streams:  []string{"eventos", ">"},
    Count:    5,
}).Result()

Locks distribuidos con SetNX

func adquirirLock(ctx context.Context, recurso string, ttl time.Duration) (bool, error) {
    return rdb.SetNX(ctx, "lock:"+recurso, "1", ttl).Result()
}

func liberarLock(ctx context.Context, recurso string) error {
    return rdb.Del(ctx, "lock:"+recurso).Err()
}

// Uso:
adquirido, err := adquirirLock(ctx, "proceso-facturacion", 30*time.Second)
if err != nil {
    return err
}
if !adquirido {
    return fmt.Errorf("proceso de facturación ya en ejecución")
}
defer liberarLock(ctx, "proceso-facturacion")
// ejecutar el proceso crítico

Para locks distribuidos en producción en entornos con múltiples instancias de Redis considera usar el algoritmo Redlock implementado en el paquete go-redis/redislock. El SetNX básico solo es fiable con una sola instancia Redis o con Sentinel.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP