gRPC streaming en Go: server streaming, client streaming y bidireccional

gRPC soporta cuatro tipos de llamadas: unaria (un mensaje de ida y vuelta), server streaming (el servidor envía varios mensajes), client streaming (el cliente envía varios mensajes) y bidireccional. Los tres últimos usan streams y en Go se implementan con métodos que reciben o devuelven una interfaz de stream.

Server streaming: el servidor envía múltiples mensajes

Define el método con stream en la respuesta del .proto:

service DataService {
  rpc ListPrices (PriceRequest) returns (stream PriceReply);
}

message PriceRequest { string symbol = 1; }
message PriceReply   { double price = 1; string ts = 2; }

En el servidor, el método recibe el stream y llama a Send() para cada mensaje. Cuando el método retorna, el stream se cierra automáticamente:

func (s *server) ListPrices(req *pb.PriceRequest, stream pb.DataService_ListPricesServer) error {
    precios := []float64{142.3, 143.1, 141.8, 144.5, 143.9}
    for _, p := range precios {
        if err := stream.Send(&pb.PriceReply{
            Price: p,
            Ts:    time.Now().Format(time.RFC3339),
        }); err != nil {
            return err
        }
        time.Sleep(200 * time.Millisecond)
    }
    return nil
}

El cliente llama a Recv() en un bucle y sale cuando recibe io.EOF:

stream, err := c.ListPrices(ctx, &pb.PriceRequest{Symbol: "AAPL"})
if err != nil {
    log.Fatal(err)
}
for {
    msg, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("error recv: %v", err)
    }
    fmt.Printf("precio: %.2f @ %sn", msg.Price, msg.Ts)
}

Client streaming: el cliente envía múltiples mensajes

service DataService {
  rpc UploadReadings (stream Reading) returns (UploadSummary);
}

message Reading     { string sensor_id = 1; double value = 2; }
message UploadSummary { int32 count = 1; double average = 2; }

El servidor acumula los mensajes con Recv() y responde una sola vez con SendAndClose():

func (s *server) UploadReadings(stream pb.DataService_UploadReadingsServer) error {
    var total float64
    var count int32
    for {
        reading, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.UploadSummary{
                Count:   count,
                Average: total / float64(count),
            })
        }
        if err != nil {
            return err
        }
        total += reading.Value
        count++
    }
}

El cliente envía con Send() y cierra con CloseAndRecv():

stream, err := c.UploadReadings(ctx)
if err != nil {
    log.Fatal(err)
}
readings := []float64{21.3, 22.1, 20.8, 23.4}
for _, v := range readings {
    if err := stream.Send(&pb.Reading{SensorId: "sensor-1", Value: v}); err != nil {
        log.Fatal(err)
    }
}
summary, err := stream.CloseAndRecv()
if err != nil {
    log.Fatal(err)
}
fmt.Printf("subidos %d registros, media=%.2fn", summary.Count, summary.Average)

Streaming bidireccional: ambos lados envían al mismo tiempo

service ChatService {
  rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}

En el servidor, el patrón más común es leer y escribir desde goroutines independientes. El stream es seguro para uso concurrente entre una goroutine lectora y una escritora:

func (s *server) Chat(stream pb.ChatService_ChatServer) error {
    errCh := make(chan error, 1)

    // goroutine lectora
    go func() {
        for {
            msg, err := stream.Recv()
            if err == io.EOF {
                errCh <- nil
                return
            }
            if err != nil {
                errCh <- err
                return
            }
            // procesar y responder
            resp := &pb.ChatMessage{
                User:    "servidor",
                Content: "eco: " + msg.Content,
            }
            if err := stream.Send(resp); err != nil {
                errCh <- err
                return
            }
        }
    }()

    return <-errCh
}

Manejo correcto de io.EOF

io.EOF en gRPC no es un error: significa que el otro extremo cerró el stream de forma limpia. El error real de red llega con un código de estado diferente. Siempre comprueba io.EOF antes de tratar el error como fatal:

msg, err := stream.Recv()
if err == io.EOF {
    break // cierre limpio, sal del bucle
}
if err != nil {
    return status.Errorf(codes.Internal, "error de stream: %v", err)
}
// procesar msg

Interceptores de streaming con go-grpc-middleware

El paquete github.com/grpc-ecosystem/go-grpc-middleware/v2 facilita encadenar interceptores tanto para llamadas unarias como para streams:

import (
    "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
    "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
)

s := grpc.NewServer(
    grpc.ChainUnaryInterceptor(
        logging.UnaryServerInterceptor(logger),
        recovery.UnaryServerInterceptor(),
    ),
    grpc.ChainStreamInterceptor(
        logging.StreamServerInterceptor(logger),
        recovery.StreamServerInterceptor(),
    ),
)

El interceptor de recovery captura panics en los handlers y los convierte en errores gRPC con código Internal, lo que evita que un panic derribe todo el proceso servidor.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP