Agente de IA duradero con Gemini y Temporal

En este instructivo, se explica cómo compilar un bucle de agente de estilo ReAct que usa la API de Gemini para el razonamiento y Temporal para la durabilidad. El código fuente completo de este instructivo está disponible en GitHub.

El agente puede llamar a herramientas, como buscar alertas meteorológicas o geolocalizar una dirección IP, y se repetirá hasta que tenga suficiente información para responder.

Lo que diferencia a esta demostración de agente de una típica es la durabilidad. Temporal conserva cada llamada al LLM, cada invocación de herramienta y cada paso del bucle del agente. Si el proceso falla, se interrumpe la red o se agota el tiempo de espera de una API, Temporal vuelve a intentarlo automáticamente y reanuda el proceso desde el último paso completado. No se pierde el historial de conversaciones ni se repiten llamadas a herramientas de forma incorrecta.

Arquitectura

La arquitectura consta de tres partes:

  • Flujo de trabajo: Es el bucle de agente que organiza la lógica de ejecución.
  • Actividades: Son unidades individuales de trabajo (llamadas a LLM, llamadas a herramientas) que Temporal hace duraderas.
  • Trabajador: Es el proceso que ejecuta los flujos de trabajo y las actividades.

En este ejemplo, colocarás las tres partes en un solo archivo (durable_agent_worker.py). En una implementación real, las separarías para permitir varias ventajas de implementación y escalabilidad. Colocarás el código que proporciona una instrucción al agente en un segundo archivo (start_workflow.py).

Requisitos previos

Para completar esta guía, necesitarás lo siguiente:

Configuración

Antes de comenzar, asegúrate de tener un servidor de desarrollo temporal ejecutándose de forma local:

temporal server start-dev

A continuación, instala las dependencias requeridas:

pip install temporalio google-genai httpx pydantic python-dotenv

Crea un archivo .env en el directorio de tu proyecto con tu clave de la API de Gemini. Puedes obtener una clave de API en Google AI Studio.

echo "GOOGLE_API_KEY=your-api-key-here" > .env

Implementación

En el resto de este instructivo, se explica durable_agent_worker.py de arriba abajo, y se compila el agente pieza por pieza. Crea el archivo y sigue las instrucciones.

Importaciones y configuración de la zona de pruebas

Comienza con las importaciones que se deben definir por adelantado. El bloque workflow.unsafe.imports_passed_through() le indica al entorno de pruebas de flujo de trabajo de Temporal que permita que ciertos módulos pasen sin restricciones. Esto es necesario porque varias bibliotecas (en particular, httpx, que es una subclase de urllib.request.Request) usan patrones que la zona de pruebas bloquearía.

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    import pydantic_core  # noqa: F401
    import annotated_types  # noqa: F401

    import httpx
    from pydantic import BaseModel, Field
    from google import genai
    from google.genai import types

Instrucciones del sistema

A continuación, define la personalidad del agente. Las instrucciones del sistema le indican al modelo cómo comportarse. Se le indica a este agente que responda en haikus cuando no se necesiten herramientas.

SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""

Definiciones de herramientas

Ahora, define las herramientas que puede usar el agente. Cada herramienta es una función asíncrona con una cadena de documentación descriptiva. Las herramientas que toman parámetros usan un modelo de Pydantic como su único argumento. Esta es una práctica recomendada de Temporal que mantiene estables las firmas de actividad a medida que agregas campos opcionales con el tiempo.

import json

NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"


class GetWeatherAlertsRequest(BaseModel):
    """Request model for getting weather alerts."""

    state: str = Field(description="Two-letter US state code (e.g. CA, NY)")


async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
    """Get weather alerts for a US state.

    Args:
        request: The request object containing:
            - state: Two-letter US state code (e.g. CA, NY)
    """
    headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
    url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"

    async with httpx.AsyncClient() as client:
        response = await client.get(url, headers=headers, timeout=5.0)
        response.raise_for_status()
        return json.dumps(response.json())

A continuación, define herramientas para la ubicación geográfica de direcciones IP:

class GetLocationRequest(BaseModel):
    """Request model for getting location info from an IP address."""

    ipaddress: str = Field(description="An IP address")


async def get_ip_address() -> str:
    """Get the public IP address of the current machine."""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://icanhazip.com")
        response.raise_for_status()
        return response.text.strip()


async def get_location_info(request: GetLocationRequest) -> str:
    """Get the location information for an IP address including city, state, and country.

    Args:
        request: The request object containing:
            - ipaddress: An IP address to look up
    """
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
        response.raise_for_status()
        result = response.json()
        return f"{result['city']}, {result['regionName']}, {result['country']}"

Registro de herramientas

A continuación, crea un registro que asigne nombres de herramientas a funciones de controlador. La función get_tools() genera objetos FunctionDeclaration compatibles con Gemini a partir de los elementos invocables con FunctionDeclaration.from_callable_with_api_option().

from typing import Any, Awaitable, Callable

ToolHandler = Callable[..., Awaitable[Any]]


def get_handler(tool_name: str) -> ToolHandler:
    """Get the handler function for a given tool name."""
    if tool_name == "get_location_info":
        return get_location_info
    if tool_name == "get_ip_address":
        return get_ip_address
    if tool_name == "get_weather_alerts":
        return get_weather_alerts
    raise ValueError(f"Unknown tool name: {tool_name}")


def get_tools() -> types.Tool:
    """Get the Tool object containing all available function declarations.

    Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
    to generate tool definitions from the handler functions.
    """
    return types.Tool(
        function_declarations=[
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_weather_alerts, api_option="GEMINI_API"
            ),
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_location_info, api_option="GEMINI_API"
            ),
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_ip_address, api_option="GEMINI_API"
            ),
        ]
    )

Actividad de LLM

Ahora define la actividad que llama a la API de Gemini. Las clases de datos GeminiChatRequest y GeminiChatResponse definen el contrato.

Inhabilitarás la llamada a funciones automática para que la invocación del LLM y la invocación de la herramienta se controlen como tareas separadas, lo que le brindará más durabilidad a tu agente. También inhabilitarás los reintentos integrados del SDK (attempts=1), ya que Temporal controla los reintentos de forma duradera.

import os
from dataclasses import dataclass

from temporalio import activity

@dataclass
class GeminiChatRequest:
    """Request parameters for a Gemini chat completion."""

    model: str
    system_instruction: str
    contents: list[types.Content]
    tools: list[types.Tool]


@dataclass
class GeminiChatResponse:
    """Response from a Gemini chat completion."""

    text: str | None
    function_calls: list[dict[str, Any]]
    raw_parts: list[types.Part]


@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
    """Execute a Gemini chat completion with tool support."""
    api_key = os.environ.get("GOOGLE_API_KEY")
    if not api_key:
        raise ValueError("GOOGLE_API_KEY environment variable is not set")
    client = genai.Client(
        api_key=api_key,
        http_options=types.HttpOptions(
            retry_options=types.HttpRetryOptions(attempts=1),
        ),
    )

    config = types.GenerateContentConfig(
        system_instruction=request.system_instruction,
        tools=request.tools,
        automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
    )

    response = await client.aio.models.generate_content(
        model=request.model,
        contents=request.contents,
        config=config,
    )

    function_calls = []
    raw_parts = []
    text_parts = []

    if response.candidates and response.candidates[0].content:
        for part in response.candidates[0].content.parts:
            raw_parts.append(part)
            if part.function_call:
                function_calls.append(
                    {
                        "name": part.function_call.name,
                        "args": dict(part.function_call.args) if part.function_call.args else {},
                    }
                )
            elif part.text:
                text_parts.append(part.text)

    text = "".join(text_parts) if text_parts and not function_calls else None

    return GeminiChatResponse(
        text=text,
        function_calls=function_calls,
        raw_parts=raw_parts,
    )

Actividad de la herramienta dinámica

A continuación, define la actividad que ejecuta herramientas. Esto usa la función de actividad dinámica de Temporal: el controlador de herramientas (un elemento llamable) se obtiene del registro de herramientas a través de la función get_handler. Esto permite definir diferentes agentes simplemente proporcionando un conjunto diferente de herramientas e instrucciones del sistema. El flujo de trabajo que implementa el bucle agentic no requiere cambios.

La actividad inspecciona la firma del controlador para determinar cómo pasar argumentos. Si el controlador espera un modelo de Pydantic, controla el formato de salida anidado que produce Gemini (por ejemplo, {"request": {"state": "CA"}} en lugar de un {"state": "CA"} plano).

import inspect
from collections.abc import Sequence

from temporalio.common import RawValue

@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
    """Execute a tool dynamically based on the activity name."""
    tool_name = activity.info().activity_type
    tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
    activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")

    handler = get_handler(tool_name)

    if not inspect.iscoroutinefunction(handler):
        raise TypeError("Tool handler must be async (awaitable).")

    sig = inspect.signature(handler)
    params = list(sig.parameters.values())

    if len(params) == 0:
        result = await handler()
    else:
        param = params[0]
        param_name = param.name
        ann = param.annotation

        if isinstance(ann, type) and issubclass(ann, BaseModel):
            nested_args = tool_args.get(param_name, tool_args)
            result = await handler(ann(**nested_args))
        else:
            result = await handler(**tool_args)

    activity.logger.info(f"Tool '{tool_name}' result: {result}")
    return result

El flujo de trabajo de bucle de agentes

Ahora tienes todas las piezas para terminar de compilar el agente. La clase AgentWorkflow implementa un flujo de trabajo que contiene el bucle del agente. Dentro de ese bucle, se invoca el LLM a través de la actividad (lo que lo hace duradero), se inspecciona el resultado y, si el LLM eligió una herramienta, se invoca a través de dynamic_tool_activity.

En este agente simple de estilo ReAct, una vez que el LLM decide no usar una herramienta, se considera que el bucle está completo y se devuelve el resultado final del LLM.

from datetime import timedelta

@workflow.defn
class AgentWorkflow:
    """Agentic loop workflow that uses Gemini for LLM calls and executes tools."""

    @workflow.run
    async def run(self, input: str) -> str:
        contents: list[types.Content] = [
            types.Content(role="user", parts=[types.Part.from_text(text=input)])
        ]

        tools = [get_tools()]

        while True:
            result = await workflow.execute_activity(
                generate_content,
                GeminiChatRequest(
                    model="gemini-3-flash-preview",
                    system_instruction=SYSTEM_INSTRUCTIONS,
                    contents=contents,
                    tools=tools,
                ),
                start_to_close_timeout=timedelta(seconds=60),
            )

            if result.function_calls:
                # Sending the complete raw_parts here ensures Gemini 3 thought
                # signatures are propagated correctly.
                contents.append(types.Content(role="model", parts=result.raw_parts))

                for function_call in result.function_calls:
                    tool_result = await self._handle_function_call(function_call)

                    contents.append(
                        types.Content(
                            role="user",
                            parts=[
                                types.Part.from_function_response(
                                    name=function_call["name"],
                                    response={"result": tool_result},
                                )
                            ],
                        )
                    )
            else:
                return result.text
            # Leave this in place. You will un-comment it during a durability
            # test later on.
            # await asyncio.sleep(10)

    async def _handle_function_call(self, function_call: dict) -> str:
        """Execute a tool via dynamic activity and return the result."""
        tool_name = function_call["name"]
        tool_args = function_call.get("args", {})

        result = await workflow.execute_activity(
            tool_name,
            tool_args,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return result

El bucle de agente es completamente duradero. Si el trabajador del agente falla después de varias iteraciones en el bucle, Temporal retomará exactamente donde lo dejó sin necesidad de volver a invocar las invocaciones de LLM o las llamadas a herramientas ya ejecutadas.

Inicio del trabajador

Por último, conecta todos los cables. Si bien el código implementa la lógica de negocios necesaria de una manera que parece ejecutarse en un solo proceso, el uso de Temporal lo convierte en un sistema impulsado por eventos (específicamente, basado en eventos) en el que la comunicación entre el flujo de trabajo y las actividades se realiza a través de la mensajería que proporciona Temporal.

El trabajador de Temporal se conecta al servicio de Temporal y actúa como un programador para las tareas de flujo de trabajo y actividad. El trabajador registra el flujo de trabajo y ambas actividades, y, luego, comienza a escuchar las tareas.

import asyncio
from concurrent.futures import ThreadPoolExecutor

from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

async def main():
    config = ClientConfig.load_client_connect_config()
    config.setdefault("target_host", "localhost:7233")
    client = await Client.connect(
        **config,
        data_converter=pydantic_data_converter,
    )

    worker = Worker(
        client,
        task_queue="gemini-agent-python-task-queue",
        workflows=[
            AgentWorkflow,
        ],
        activities=[
            generate_content,
            dynamic_tool_activity,
        ],
        activity_executor=ThreadPoolExecutor(max_workers=10),
    )
    await worker.run()


if __name__ == "__main__":
    load_dotenv()
    asyncio.run(main())

La secuencia de comandos del cliente

Crea la secuencia de comandos del cliente (start_workflow.py). Envía una consulta y espera el resultado. Observa que se conecta a la misma cola de tareas a la que se hace referencia en el trabajador del agente: la secuencia de comandos start_workflow envía una tarea de flujo de trabajo con la instrucción del usuario a esa cola de tareas, lo que inicia la ejecución del agente.

import asyncio
import sys
import uuid

from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter


async def main():
    client = await Client.connect(
        "localhost:7233",
        data_converter=pydantic_data_converter,
    )

    query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"

    result = await client.execute_workflow(
        "AgentWorkflow",
        query,
        id=f"gemini-agent-id-{uuid.uuid4()}",
        task_queue="gemini-agent-python-task-queue",
    )
    print(f"\nResult:\n{result}")


if __name__ == "__main__":
    asyncio.run(main())

Ejecuta el agente

Si aún no lo hiciste, inicia el servidor de desarrollo de Temporal:

temporal server start-dev

En una nueva ventana de terminal, inicia el trabajador del agente:

python -m durable_agent_worker

En una tercera ventana de la terminal, envía una consulta a tu agente:

python -m start_workflow "are there any weather alerts for where I am?"

Observa el resultado en la terminal de durable_agent_worker que muestra las acciones que ocurren en cada iteración del bucle de agentes. El LLM puede satisfacer la solicitud del usuario invocando una serie de herramientas a su disposición. Puedes ver los pasos que se ejecutaron a través de la IU de Temporal en http://localhost:8233/namespaces/default/workflows.

Prueba algunas instrucciones diferentes para ver el motivo del agente y las herramientas de llamadas:

python -m start_workflow "are there any weather alerts for New York?"
python -m start_workflow "where am I?"
python -m start_workflow "what is my ip address?"
python -m start_workflow "tell me a joke"

La última instrucción no requiere ninguna herramienta, por lo que el agente responde con un haiku basado en SYSTEM_INSTRUCTIONS.

Prueba de durabilidad (opcional)

Compilar sobre Temporal garantiza que tu agente sobreviva a las fallas sin problemas. Puedes probar esto con dos experimentos distintos.

Cómo simular una interrupción de red

En esta prueba, inhabilitarás temporalmente la conexión a Internet de tu computadora, enviarás un flujo de trabajo, observarás cómo Temporal vuelve a intentarlo automáticamente y, luego, restablecerás la red para ver cómo se recupera.

  1. Desconecta tu máquina de Internet (por ejemplo, desactiva la conexión Wi-Fi).
  2. Envía un flujo de trabajo:

    python -m start_workflow "tell me a joke"
  3. Verifica la IU de Temporal (http://localhost:8233). Verás que la actividad del LLM falla y que Temporal administra automáticamente los reintentos en segundo plano.

  4. Vuelve a conectarte a Internet.

  5. El siguiente reintento automatizado llegará correctamente a la API de Gemini, y tu terminal imprimirá el resultado final.

Cómo sobrevivir a una falla del trabajador

En esta prueba, detendrás el trabajador a mitad de la ejecución y lo reiniciarás. Las repeticiones temporales reproducen el historial del flujo de trabajo (registro de eventos) y se reanudan desde la última actividad completada. No se repiten las invocaciones de LLM ni las llamadas a herramientas que ya se completaron.

  1. Para tener tiempo de detener el trabajador, abre durable_agent_worker.py y quita temporalmente el comentario de await asyncio.sleep(10) dentro del bucle AgentWorkflow run.
  2. Reinicia el trabajador:

    python -m durable_agent_worker
  3. Envía una búsqueda que active varias herramientas:

    python -m start_workflow "are there any weather alerts where I am?"
  4. Cierra el proceso del trabajador en cualquier momento antes de que se complete (Ctrl-C en la terminal del trabajador o con kill %1 si se ejecuta en segundo plano).

  5. Reinicia el trabajador:

    python -m durable_agent_worker

Temporal reproduce el historial del flujo de trabajo. Las llamadas al LLM y las invocaciones de herramientas que ya se completaron no se vuelven a ejecutar, sino que sus resultados se reproducen instantáneamente desde el historial (el registro de eventos). El flujo de trabajo finaliza correctamente.

Más recursos