confluent-kafka-go es el cliente oficial de Confluent para Apache Kafka en Go. Está basado en librdkafka, lo que le da un rendimiento excelente pero añade una dependencia de C (necesita CGO_ENABLED=1). Para instalaciones sin C o con Alpine existe la alternativa pura Go segmentio/kafka-go.
Instalar y configurar
go get github.com/confluentinc/confluent-kafka-go/v2/kafka
Producer: enviar mensajes
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func crearProducer() (*kafka.Producer, error) {
return kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "all", // esperar confirmación de todos los líderes
"retries": 3,
"linger.ms": 5, // acumular mensajes durante 5ms para batching
"batch.size": 16384,
})
}
func publicarEvento(p *kafka.Producer, topic, clave string, datos []byte) error {
return p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte(clave),
Value: datos,
Headers: []kafka.Header{
{Key: "content-type", Value: []byte("application/json")},
},
}, nil)
}
Canal de eventos: confirmaciones de entrega
La entrega de mensajes en Kafka es asíncrona. El producer devuelve inmediatamente y el cliente notifica el resultado (éxito o error) a través de un canal de eventos:
func main() {
p, err := crearProducer()
if err != nil {
panic(err)
}
defer p.Close()
// Goroutine que procesa confirmaciones de entrega
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Error entregando mensaje: %vn", ev.TopicPartition.Error)
} else {
fmt.Printf("Mensaje entregado a %s [%d] offset %vn",
*ev.TopicPartition.Topic,
ev.TopicPartition.Partition,
ev.TopicPartition.Offset,
)
}
}
}
}()
// Publicar varios mensajes
for i := 0; i < 10; i++ {
topic := "pedidos"
publicarEvento(p, topic,
fmt.Sprintf("pedido-%d", i),
[]byte(fmt.Sprintf(`{"id":%d,"estado":"nuevo"}`, i)),
)
}
// Vaciar el buffer antes de cerrar
p.Flush(15 * 1000) // timeout en ms
}
Consumer con grupos de consumo
func crearConsumer(grupoid string) (*kafka.Consumer, error) {
return kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": grupoid,
"auto.offset.reset": "earliest",
"enable.auto.commit": false, // commit manual para control total
})
}
func consumir(groupID string, topics []string) {
c, err := crearConsumer(groupID)
if err != nil {
panic(err)
}
defer c.Close()
c.SubscribeTopics(topics, nil)
for {
msg, err := c.ReadMessage(-1) // -1 = bloquear indefinidamente
if err != nil {
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrUnknownTopicOrPart {
fmt.Println("Topic no encontrado:", err)
continue
}
fmt.Println("Error en consumer:", err)
break
}
fmt.Printf("Mensaje en %s [%d]@%v: %s = %sn",
*msg.TopicPartition.Topic,
msg.TopicPartition.Partition,
msg.TopicPartition.Offset,
string(msg.Key),
string(msg.Value),
)
// Procesar el mensaje...
if err := procesarMensaje(msg); err != nil {
fmt.Println("Error procesando, no hacemos commit:", err)
continue
}
// Commit manual del offset
c.CommitMessage(msg)
}
}
Gestión de offsets: garantías de procesamiento
// Commit de offset específico
topicPartition := kafka.TopicPartition{
Topic: &topic,
Partition: 0,
Offset: kafka.Offset(1234),
}
c.CommitOffsets([]kafka.TopicPartition{topicPartition})
// Consultar offset actual
offsets, _ := c.Committed([]kafka.TopicPartition{topicPartition}, 5000)
for _, o := range offsets {
fmt.Printf("Offset confirmado para %s[%d]: %vn",
*o.Topic, o.Partition, o.Offset)
}
// Resetear al principio del topic
c.SeekPartitions([]kafka.TopicPartition{{
Topic: &topic,
Partition: 0,
Offset: kafka.OffsetBeginning,
}})
Con enable.auto.commit: false y commit manual controlas exactamente qué mensajes se marcan como procesados. Si tu proceso cae antes del commit, Kafka los re-entregará al reiniciar. Esto te da semántica at-least-once, que es lo que necesitas cuando no puedes permitirte perder mensajes.
