Protocol Buffers y gRPC en Python: definir servicios, generar código y streaming bidireccional

Protocol Buffers (protobuf) es el sistema de serialización binaria de Google, y gRPC es el framework RPC que se construye sobre él. Juntos permiten definir la interfaz de un servicio en un fichero .proto y generar automáticamente el código cliente y servidor en Python (o en cualquier otro lenguaje soportado). El resultado es comunicación entre servicios más rápida y menos verbosa que REST+JSON, con contratos tipados y verificados en compilación.

Instalación

# pip install grpcio grpcio-tools
# El compilador protoc viene incluido en grpcio-tools

Definir mensajes y servicios en .proto

# calculadora.proto
syntax = "proto3";
package calculadora;

// Mensajes de datos
message OperacionRequest {
    double a = 1;
    double b = 2;
    string operacion = 3;  // "sumar", "restar", "multiplicar", "dividir"
}

message ResultadoResponse {
    double resultado = 1;
    bool exito = 2;
    string error = 3;
}

message NumeroRequest {
    int32 n = 1;
}

message SecuenciaResponse {
    repeated int64 numeros = 1;   // lista de enteros
}

message EstadisticasResponse {
    double media = 1;
    double maximo = 2;
    double minimo = 3;
    map<string, int32> frecuencias = 4;   // map de string a int
}

// Definición del servicio con los cuatro tipos de RPC
service Calculadora {
    // Unary: petición-respuesta simple
    rpc Calcular (OperacionRequest) returns (ResultadoResponse);

    // Server streaming: el servidor envía múltiples respuestas
    rpc GenerarFibonacci (NumeroRequest) returns (stream SecuenciaResponse);

    // Client streaming: el cliente envía múltiples datos
    rpc CalcularEstadisticas (stream OperacionRequest) returns (EstadisticasResponse);

    // Bidirectional streaming: ambos envían múltiples mensajes
    rpc ChatCalculadora (stream OperacionRequest) returns (stream ResultadoResponse);
}

Generar stubs con protoc

# Desde la terminal:
# python -m grpc_tools.protoc 
#     -I. 
#     --python_out=. 
#     --grpc_python_out=. 
#     calculadora.proto

# Genera:
# calculadora_pb2.py        — clases de mensajes
# calculadora_pb2_grpc.py   — Stub (cliente) y Servicer (servidor)

Implementar el servidor

# servidor.py
import grpc
from concurrent import futures
import calculadora_pb2
import calculadora_pb2_grpc


class CalculadoraServicer(calculadora_pb2_grpc.CalculadoraServicer):

    def Calcular(self, request, context):
        """Unary RPC."""
        try:
            if request.operacion == 'sumar':
                resultado = request.a + request.b
            elif request.operacion == 'restar':
                resultado = request.a - request.b
            elif request.operacion == 'multiplicar':
                resultado = request.a * request.b
            elif request.operacion == 'dividir':
                if request.b == 0:
                    context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
                    context.set_details('División por cero')
                    return calculadora_pb2.ResultadoResponse(exito=False, error='División por cero')
                resultado = request.a / request.b
            else:
                context.set_code(grpc.StatusCode.UNIMPLEMENTED)
                context.set_details(f"Operación '{request.operacion}' no soportada")
                return calculadora_pb2.ResultadoResponse(exito=False, error='Operación desconocida')
            return calculadora_pb2.ResultadoResponse(resultado=resultado, exito=True)
        except Exception as e:
            return calculadora_pb2.ResultadoResponse(exito=False, error=str(e))

    def GenerarFibonacci(self, request, context):
        """Server streaming RPC: envía n números de Fibonacci uno a uno."""
        a, b = 0, 1
        for _ in range(request.n):
            yield calculadora_pb2.SecuenciaResponse(numeros=[a])
            a, b = b, a + b

    def CalcularEstadisticas(self, request_iterator, context):
        """Client streaming RPC: recibe operaciones y devuelve estadísticas."""
        valores = []
        for req in request_iterator:
            valores.append(req.a)
        if not valores:
            return calculadora_pb2.EstadisticasResponse()
        return calculadora_pb2.EstadisticasResponse(
            media=sum(valores) / len(valores),
            maximo=max(valores),
            minimo=min(valores),
        )

    def ChatCalculadora(self, request_iterator, context):
        """Bidirectional streaming RPC."""
        for request in request_iterator:
            try:
                if request.operacion == 'sumar':
                    resultado = request.a + request.b
                else:
                    resultado = 0
                yield calculadora_pb2.ResultadoResponse(resultado=resultado, exito=True)
            except Exception as e:
                yield calculadora_pb2.ResultadoResponse(exito=False, error=str(e))


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    calculadora_pb2_grpc.add_CalculadoraServicer_to_server(CalculadoraServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Servidor gRPC escuchando en el puerto 50051")
    server.wait_for_termination()


if __name__ == '__main__':
    serve()

Implementar el cliente

# cliente.py
import grpc
import calculadora_pb2
import calculadora_pb2_grpc


def main():
    with grpc.insecure_channel('localhost:50051') as canal:
        stub = calculadora_pb2_grpc.CalculadoraStub(canal)

        # Unary RPC
        req = calculadora_pb2.OperacionRequest(a=10, b=3, operacion='dividir')
        resp = stub.Calcular(req)
        print(f"10 / 3 = {resp.resultado:.4f}")

        # Server streaming
        fib_req = calculadora_pb2.NumeroRequest(n=10)
        print("Fibonacci:", end=' ')
        for resp in stub.GenerarFibonacci(fib_req):
            print(resp.numeros[0], end=' ')
        print()

        # Client streaming
        def generar_valores():
            for valor in [1.5, 3.0, 2.5, 4.0, 2.0]:
                yield calculadora_pb2.OperacionRequest(a=valor, b=0, operacion='sumar')

        stats = stub.CalcularEstadisticas(generar_valores())
        print(f"Media: {stats.media}, Max: {stats.maximo}, Min: {stats.minimo}")


if __name__ == '__main__':
    main()

Interceptores: logging y autenticación

import grpc
import time


class LoggingInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        inicio = time.perf_counter()
        handler = continuation(handler_call_details)
        metodo = handler_call_details.method
        print(f"[{time.strftime('%H:%M:%S')}] RPC: {metodo}")
        return handler


class AuthInterceptor(grpc.ServerInterceptor):
    TOKEN_VALIDO = "mi-token-secreto"

    def intercept_service(self, continuation, handler_call_details):
        metadata = dict(handler_call_details.invocation_metadata)
        token = metadata.get('authorization', '')
        if not token.startswith('Bearer ') or token[7:] != self.TOKEN_VALIDO:
            def abortar(ignored_request, context):
                context.abort(grpc.StatusCode.UNAUTHENTICATED, 'Token inválido')
            return grpc.unary_unary_rpc_method_handler(abortar)
        return continuation(handler_call_details)

betterproto: stubs con dataclasses

# pip install betterproto grpcio
# python -m grpc_tools.protoc -I. --python_betterproto_out=. calculadora.proto

# Con betterproto los mensajes son dataclasses de Python:
from calculadora import OperacionRequest, ResultadoResponse

req = OperacionRequest(a=10.0, b=3.0, operacion="sumar")
print(req.a + req.b)   # 13.0

# Serializar/deserializar
datos = bytes(req)
req2 = OperacionRequest().parse(datos)
print(req2.operacion)   # sumar

gRPC brilla en comunicaciones entre microservicios donde la latencia importa, el contrato entre servicios debe ser estricto o necesitas streaming bidireccional. Para APIs públicas expuestas a navegadores, REST o GraphQL siguen siendo más convenientes, aunque herramientas como grpc-gateway permiten exponer un endpoint REST que traduce peticiones a gRPC internamente.

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP