Kafka en Go con confluent-kafka-go: producer, consumer, grupos de consumo y offsets

confluent-kafka-go es el cliente oficial de Confluent para Apache Kafka en Go. Está basado en librdkafka, lo que le da un rendimiento excelente pero añade una dependencia de C (necesita CGO_ENABLED=1). Para instalaciones sin C o con Alpine existe la alternativa pura Go segmentio/kafka-go.

Instalar y configurar

go get github.com/confluentinc/confluent-kafka-go/v2/kafka

Producer: enviar mensajes

package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func crearProducer() (*kafka.Producer, error) {
    return kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "acks":              "all",    // esperar confirmación de todos los líderes
        "retries":           3,
        "linger.ms":         5,       // acumular mensajes durante 5ms para batching
        "batch.size":        16384,
    })
}

func publicarEvento(p *kafka.Producer, topic, clave string, datos []byte) error {
    return p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &topic,
            Partition: kafka.PartitionAny,
        },
        Key:   []byte(clave),
        Value: datos,
        Headers: []kafka.Header{
            {Key: "content-type", Value: []byte("application/json")},
        },
    }, nil)
}

Canal de eventos: confirmaciones de entrega

La entrega de mensajes en Kafka es asíncrona. El producer devuelve inmediatamente y el cliente notifica el resultado (éxito o error) a través de un canal de eventos:

func main() {
    p, err := crearProducer()
    if err != nil {
        panic(err)
    }
    defer p.Close()

    // Goroutine que procesa confirmaciones de entrega
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Error entregando mensaje: %vn", ev.TopicPartition.Error)
                } else {
                    fmt.Printf("Mensaje entregado a %s [%d] offset %vn",
                        *ev.TopicPartition.Topic,
                        ev.TopicPartition.Partition,
                        ev.TopicPartition.Offset,
                    )
                }
            }
        }
    }()

    // Publicar varios mensajes
    for i := 0; i < 10; i++ {
        topic := "pedidos"
        publicarEvento(p, topic,
            fmt.Sprintf("pedido-%d", i),
            []byte(fmt.Sprintf(`{"id":%d,"estado":"nuevo"}`, i)),
        )
    }

    // Vaciar el buffer antes de cerrar
    p.Flush(15 * 1000) // timeout en ms
}

Consumer con grupos de consumo

func crearConsumer(grupoid string) (*kafka.Consumer, error) {
    return kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  "localhost:9092",
        "group.id":           grupoid,
        "auto.offset.reset":  "earliest",
        "enable.auto.commit": false, // commit manual para control total
    })
}

func consumir(groupID string, topics []string) {
    c, err := crearConsumer(groupID)
    if err != nil {
        panic(err)
    }
    defer c.Close()

    c.SubscribeTopics(topics, nil)

    for {
        msg, err := c.ReadMessage(-1) // -1 = bloquear indefinidamente
        if err != nil {
            if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrUnknownTopicOrPart {
                fmt.Println("Topic no encontrado:", err)
                continue
            }
            fmt.Println("Error en consumer:", err)
            break
        }

        fmt.Printf("Mensaje en %s [%d]@%v: %s = %sn",
            *msg.TopicPartition.Topic,
            msg.TopicPartition.Partition,
            msg.TopicPartition.Offset,
            string(msg.Key),
            string(msg.Value),
        )

        // Procesar el mensaje...
        if err := procesarMensaje(msg); err != nil {
            fmt.Println("Error procesando, no hacemos commit:", err)
            continue
        }

        // Commit manual del offset
        c.CommitMessage(msg)
    }
}

Gestión de offsets: garantías de procesamiento

// Commit de offset específico
topicPartition := kafka.TopicPartition{
    Topic:     &topic,
    Partition: 0,
    Offset:    kafka.Offset(1234),
}
c.CommitOffsets([]kafka.TopicPartition{topicPartition})

// Consultar offset actual
offsets, _ := c.Committed([]kafka.TopicPartition{topicPartition}, 5000)
for _, o := range offsets {
    fmt.Printf("Offset confirmado para %s[%d]: %vn",
        *o.Topic, o.Partition, o.Offset)
}

// Resetear al principio del topic
c.SeekPartitions([]kafka.TopicPartition{{
    Topic:     &topic,
    Partition: 0,
    Offset:    kafka.OffsetBeginning,
}})

Con enable.auto.commit: false y commit manual controlas exactamente qué mensajes se marcan como procesados. Si tu proceso cae antes del commit, Kafka los re-entregará al reiniciar. Esto te da semántica at-least-once, que es lo que necesitas cuando no puedes permitirte perder mensajes.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP