Construye un mini framework de agentes en Python para entender LangGraph por dentro

LangGraph, CrewAI y AutoGen son herramientas estupendas. Pero tienen un problema: cuando algo falla, no sabes por dónde empezar a buscar. El agente se queda en bucle, llama a la herramienta equivocada o simplemente devuelve una respuesta sin sentido, y tú miras los logs sin entender qué ha pasado.

La solución no es leer más documentación. Es construir el tuyo propio, aunque sea pequeño. Cuando escribes el loop de razonamiento a mano, entiendes qué hace cada parte y por qué el orden de los mensajes importa. Después, cuando vuelves a LangGraph, lo que antes parecía magia empieza a tener sentido.

Este artículo va al grano: un mini framework funcional en menos de 100 líneas de Python puro, con soporte para herramientas, historial de mensajes y límite de iteraciones. Sin dependencias externas salvo el cliente de OpenAI.

El loop central de un agente ReAct

ReAct (Reasoning + Acting) es el patrón que siguen la mayoría de agentes con herramientas. La idea es sencilla: el LLM razona, decide si necesita una herramienta, la ejecuta y vuelve a razonar con el resultado. El ciclo se repite hasta que el modelo produce una respuesta final sin invocar ninguna herramienta.

Los pasos son siempre estos cuatro:

  1. Llamar al LLM con el historial completo de mensajes.
  2. Comprobar si la respuesta incluye una llamada a herramienta (tool_calls).
  3. Si hay herramienta, ejecutarla y añadir el resultado al historial.
  4. Volver al paso 1. Si no hay herramienta, el agente ha terminado.

La condición de salida es que el LLM devuelva un mensaje normal, sin tool_calls. Eso indica que ya tiene suficiente información para responder.

def run(self, user_input: str) -> str:
    self.messages.append({"role": "user", "content": user_input})

    for _ in range(self.max_iterations):
        response = self.client.chat.completions.create(
            model=self.model,
            messages=self.messages,
            tools=self.tool_schemas or None,
        )
        msg = response.choices[0].message

        if not msg.tool_calls:
            self.messages.append({"role": "assistant", "content": msg.content})
            return msg.content

        self.messages.append(msg)
        self._execute_tool_calls(msg.tool_calls)

    return "El agente alcanzó el límite de iteraciones sin producir una respuesta."

El gestor de herramientas

Una herramienta tiene tres partes: un nombre, una descripción en lenguaje natural y la función Python que la implementa. El LLM nunca ejecuta código directamente; solo decide qué herramienta llamar y con qué parámetros. Tú eres quien ejecuta la función real.

Para registrar herramientas, un diccionario simple es suficiente:

self.tools: dict[str, callable] = {}
self.tool_schemas: list[dict] = []

def register_tool(self, name: str, description: str, func: callable, parameters: dict):
    self.tools[name] = func
    self.tool_schemas.append({
        "type": "function",
        "function": {
            "name": name,
            "description": description,
            "parameters": parameters,
        }
    })

Cuando el LLM decide usar una herramienta, te llega un objeto tool_call con el nombre y los argumentos en JSON. Para ejecutarla:

def _execute_tool_calls(self, tool_calls):
    for tc in tool_calls:
        name = tc.function.name
        args = json.loads(tc.function.arguments)

        if name not in self.tools:
            result = f"Error: herramienta '{name}' no registrada."
        else:
            try:
                result = str(self.tools[name](**args))
            except Exception as e:
                result = f"Error al ejecutar '{name}': {e}"

        self.messages.append({
            "role": "tool",
            "tool_call_id": tc.id,
            "content": result,
        })

Un ejemplo práctico: una herramienta que consulta el tiempo. En producción llamarías a una API real, pero para ver el mecanismo funciona igual con un valor fijo:

def get_weather(city: str) -> str:
    return f"El tiempo en {city}: 22°C, parcialmente nublado."

agent.register_tool(
    name="get_weather",
    description="Consulta el tiempo actual en una ciudad.",
    func=get_weather,
    parameters={
        "type": "object",
        "properties": {
            "city": {"type": "string", "description": "Nombre de la ciudad"}
        },
        "required": ["city"]
    }
)

El estado del agente: la lista de mensajes

El estado de un agente ReAct es simplemente una lista de mensajes con roles. No hay nada más sofisticado que eso. Los roles que usarás son system, user, assistant y tool.

El orden importa mucho. Cuando el LLM invoca una herramienta, añades el mensaje del asistente (con el tool_call) y justo después el resultado con rol tool. Si los separas o los desordenás, la API te devuelve un error o el modelo se confunde.

# Estructura correcta del historial tras una llamada a herramienta:
[
    {"role": "system", "content": "Eres un asistente útil."},
    {"role": "user", "content": "¿Qué tiempo hace en Madrid?"},
    {"role": "assistant", "content": None, "tool_calls": [...]},  # respuesta del LLM
    {"role": "tool", "tool_call_id": "call_abc123", "content": "22°C, nublado."},
    # aquí el LLM vuelve a responder con la respuesta final
]

Fíjate en que el mensaje del asistente con el tool_call tiene content: None. Eso es normal y esperado cuando el modelo decide usar una herramienta en lugar de responder directamente.

Límites y seguridad básica

Sin límites, un agente mal configurado puede entrar en bucle indefinido y consumir tokens sin parar. Dos medidas mínimas que debes poner siempre.

Máximo de iteraciones

El parámetro max_iterations del constructor limita cuántas veces puede girar el loop. Un valor razonable para la mayoría de tareas es entre 5 y 10. Si el agente lleva 10 iteraciones sin terminar, algo va mal y es mejor cortar que dejarlo correr.

Timeout por herramienta

Si una herramienta llama a una API externa, puede bloquearse. Usa ThreadPoolExecutor para poner un límite de tiempo:

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout

def _run_with_timeout(self, func, kwargs, timeout=10):
    with ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(func, **kwargs)
        try:
            return future.result(timeout=timeout)
        except FuturesTimeout:
            return "Error: la herramienta tardó demasiado y se canceló."

Devolver el error al LLM en lugar de lanzar una excepción es la mejor estrategia: el modelo puede reconsiderar su plan o pedir al usuario más información. Si simplemente crasheas, pierdes todo el historial.

Memoria entre sesiones

Por defecto, cada vez que creas un agente nuevo empiezas con el historial vacío. Si quieres que recuerde conversaciones anteriores, guarda la lista de mensajes en disco al terminar y cárgala al inicio:

import json
from pathlib import Path

def save_history(self, path: str):
    data = []
    for m in self.messages:
        if hasattr(m, 'model_dump'):
            data.append(m.model_dump())
        else:
            data.append(m)
    Path(path).write_text(json.dumps(data, ensure_ascii=False, indent=2))

def load_history(self, path: str):
    if Path(path).exists():
        self.messages = json.loads(Path(path).read_text())

Un detalle importante: los historiales crecen. Si acumulas cientos de mensajes, el contexto se satura y la API te devuelve un error. Trunca el historial cuando supere un umbral, pero conserva siempre el mensaje de sistema y los últimos N pares usuario/asistente para no perder el hilo:

def _trim_history(self, max_messages=20):
    system = [m for m in self.messages if m.get("role") == "system"]
    rest = [m for m in self.messages if m.get("role") != "system"]
    if len(rest) > max_messages:
        rest = rest[-max_messages:]
    self.messages = system + rest

Qué tiene LangGraph que tu mini-framework no tiene

Con todo lo anterior tienes un agente funcional. Pero LangGraph existe por buenas razones, y conviene saber cuáles son antes de decidir qué usar.

Lo que LangGraph añade sobre este código:

  • Estado tipado con Pydantic: en lugar de una lista de diccionarios libre, el estado tiene esquema y validación.
  • Checkpointing automático: el estado se guarda en cada paso, lo que permite reanudar un agente interrumpido.
  • Grafos con ramas: puedes definir flujos condicionales donde diferentes nodos se ejecutan según el resultado anterior.
  • Human-in-the-loop: el agente puede pausar y esperar confirmación humana antes de continuar.
  • Ramas paralelas: varios nodos ejecutándose a la vez con sincronización automática.
  • Retry policies: reintentos con backoff configurables por herramienta.

Tu mini-framework no tiene nada de eso. Y no pasa nada. Para prototipos, scripts internos y casos donde quieres control total sobre lo que ocurre, 80 líneas de Python tuyo son más fáciles de mantener que una abstracción compleja. Cuando el proyecto crezca y necesites persistencia automática o flujos ramificados, ahí tiene sentido dar el salto a LangGraph. Puedes leer más sobre interfaces para aplicaciones Python con IA o sobre automatización avanzada con Python para complementar lo que construyas aquí.

El código completo del mini-framework

Aquí está todo junto, menos de 100 líneas:

import json
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
from pathlib import Path
from openai import OpenAI


class Agent:
    def __init__(
        self,
        system_prompt: str = "Eres un asistente útil.",
        model: str = "gpt-4o-mini",
        max_iterations: int = 8,
        tool_timeout: int = 10,
    ):
        self.client = OpenAI()
        self.model = model
        self.max_iterations = max_iterations
        self.tool_timeout = tool_timeout
        self.tools: dict[str, callable] = {}
        self.tool_schemas: list[dict] = []
        self.messages: list[dict] = [{"role": "system", "content": system_prompt}]

    def register_tool(self, name: str, description: str, func: callable, parameters: dict):
        self.tools[name] = func
        self.tool_schemas.append({
            "type": "function",
            "function": {"name": name, "description": description, "parameters": parameters},
        })

    def _run_with_timeout(self, func, kwargs):
        with ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(func, **kwargs)
            try:
                return future.result(timeout=self.tool_timeout)
            except FuturesTimeout:
                return "Error: la herramienta tardó demasiado y se canceló."

    def _execute_tool_calls(self, tool_calls):
        for tc in tool_calls:
            name = tc.function.name
            args = json.loads(tc.function.arguments)
            if name not in self.tools:
                result = f"Error: herramienta '{name}' no registrada."
            else:
                result = str(self._run_with_timeout(self.tools[name], args))
            self.messages.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": result,
            })

    def _trim_history(self, max_messages: int = 20):
        system = [m for m in self.messages if isinstance(m, dict) and m.get("role") == "system"]
        rest = [m for m in self.messages if not (isinstance(m, dict) and m.get("role") == "system")]
        if len(rest) > max_messages:
            rest = rest[-max_messages:]
        self.messages = system + rest

    def save_history(self, path: str):
        data = [m.model_dump() if hasattr(m, "model_dump") else m for m in self.messages]
        Path(path).write_text(json.dumps(data, ensure_ascii=False, indent=2))

    def load_history(self, path: str):
        if Path(path).exists():
            self.messages = json.loads(Path(path).read_text())

    def run(self, user_input: str) -> str:
        self.messages.append({"role": "user", "content": user_input})
        self._trim_history()

        for _ in range(self.max_iterations):
            response = self.client.chat.completions.create(
                model=self.model,
                messages=self.messages,
                tools=self.tool_schemas or None,
            )
            msg = response.choices[0].message

            if not msg.tool_calls:
                self.messages.append({"role": "assistant", "content": msg.content})
                return msg.content

            self.messages.append(msg)
            self._execute_tool_calls(msg.tool_calls)

        return "El agente alcanzó el límite de iteraciones sin producir una respuesta."


# --- Ejemplo de uso ---
if __name__ == "__main__":
    def get_weather(city: str) -> str:
        return f"El tiempo en {city}: 22°C, parcialmente nublado."

    agent = Agent(system_prompt="Eres un asistente que puede consultar el tiempo.")
    agent.register_tool(
        name="get_weather",
        description="Consulta el tiempo actual en una ciudad.",
        func=get_weather,
        parameters={
            "type": "object",
            "properties": {"city": {"type": "string", "description": "Nombre de la ciudad"}},
            "required": ["city"],
        },
    )

    respuesta = agent.run("¿Qué tiempo hace en Barcelona?")
    print(respuesta)

Con esto tienes un agente que razona, usa herramientas, gestiona el historial y no se queda colgado. A partir de aquí puedes añadir lo que necesites: más herramientas, persistencia en base de datos, logging detallado o cualquier cosa que tu caso concreto requiera. Lo importante es que ahora sabes exactamente qué pasa en cada paso.

Imagen: Pexels / Al Nahian

COMPARTE ESTE ARTÍCULO

COMPARTIR EN FACEBOOK
COMPARTIR EN TWITTER
COMPARTIR EN LINKEDIN
COMPARTIR EN WHATSAPP