gRPC soporta cuatro tipos de llamadas: unaria (un mensaje de ida y vuelta), server streaming (el servidor envía varios mensajes), client streaming (el cliente envía varios mensajes) y bidireccional. Los tres últimos usan streams y en Go se implementan con métodos que reciben o devuelven una interfaz de stream.
Server streaming: el servidor envía múltiples mensajes
Define el método con stream en la respuesta del .proto:
service DataService {
rpc ListPrices (PriceRequest) returns (stream PriceReply);
}
message PriceRequest { string symbol = 1; }
message PriceReply { double price = 1; string ts = 2; }
En el servidor, el método recibe el stream y llama a Send() para cada mensaje. Cuando el método retorna, el stream se cierra automáticamente:
func (s *server) ListPrices(req *pb.PriceRequest, stream pb.DataService_ListPricesServer) error {
precios := []float64{142.3, 143.1, 141.8, 144.5, 143.9}
for _, p := range precios {
if err := stream.Send(&pb.PriceReply{
Price: p,
Ts: time.Now().Format(time.RFC3339),
}); err != nil {
return err
}
time.Sleep(200 * time.Millisecond)
}
return nil
}
El cliente llama a Recv() en un bucle y sale cuando recibe io.EOF:
stream, err := c.ListPrices(ctx, &pb.PriceRequest{Symbol: "AAPL"})
if err != nil {
log.Fatal(err)
}
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("error recv: %v", err)
}
fmt.Printf("precio: %.2f @ %sn", msg.Price, msg.Ts)
}
Client streaming: el cliente envía múltiples mensajes
service DataService {
rpc UploadReadings (stream Reading) returns (UploadSummary);
}
message Reading { string sensor_id = 1; double value = 2; }
message UploadSummary { int32 count = 1; double average = 2; }
El servidor acumula los mensajes con Recv() y responde una sola vez con SendAndClose():
func (s *server) UploadReadings(stream pb.DataService_UploadReadingsServer) error {
var total float64
var count int32
for {
reading, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.UploadSummary{
Count: count,
Average: total / float64(count),
})
}
if err != nil {
return err
}
total += reading.Value
count++
}
}
El cliente envía con Send() y cierra con CloseAndRecv():
stream, err := c.UploadReadings(ctx)
if err != nil {
log.Fatal(err)
}
readings := []float64{21.3, 22.1, 20.8, 23.4}
for _, v := range readings {
if err := stream.Send(&pb.Reading{SensorId: "sensor-1", Value: v}); err != nil {
log.Fatal(err)
}
}
summary, err := stream.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
fmt.Printf("subidos %d registros, media=%.2fn", summary.Count, summary.Average)
Streaming bidireccional: ambos lados envían al mismo tiempo
service ChatService {
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
En el servidor, el patrón más común es leer y escribir desde goroutines independientes. El stream es seguro para uso concurrente entre una goroutine lectora y una escritora:
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
errCh := make(chan error, 1)
// goroutine lectora
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
errCh <- nil
return
}
if err != nil {
errCh <- err
return
}
// procesar y responder
resp := &pb.ChatMessage{
User: "servidor",
Content: "eco: " + msg.Content,
}
if err := stream.Send(resp); err != nil {
errCh <- err
return
}
}
}()
return <-errCh
}
Manejo correcto de io.EOF
io.EOF en gRPC no es un error: significa que el otro extremo cerró el stream de forma limpia. El error real de red llega con un código de estado diferente. Siempre comprueba io.EOF antes de tratar el error como fatal:
msg, err := stream.Recv()
if err == io.EOF {
break // cierre limpio, sal del bucle
}
if err != nil {
return status.Errorf(codes.Internal, "error de stream: %v", err)
}
// procesar msg
Interceptores de streaming con go-grpc-middleware
El paquete github.com/grpc-ecosystem/go-grpc-middleware/v2 facilita encadenar interceptores tanto para llamadas unarias como para streams:
import (
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
)
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(
logging.UnaryServerInterceptor(logger),
recovery.UnaryServerInterceptor(),
),
grpc.ChainStreamInterceptor(
logging.StreamServerInterceptor(logger),
recovery.StreamServerInterceptor(),
),
)
El interceptor de recovery captura panics en los handlers y los convierte en errores gRPC con código Internal, lo que evita que un panic derribe todo el proceso servidor.
