Agente AI durevole con Gemini e Temporal

Questo tutorial ti guida nella creazione di un ciclo agentico in stile ReAct che utilizza l'API Gemini per il ragionamento e Temporal per la durabilità. Il codice sorgente completo di questo tutorial è disponibile su GitHub.

L'agente può chiamare strumenti, ad esempio cercare avvisi meteo o geolocalizzare un indirizzo IP, e continuerà a farlo finché non avrà informazioni sufficienti per rispondere.

Ciò che lo distingue da una tipica demo dell'agente è la durabilità. Ogni chiamata LLM, ogni invocazione di strumenti e ogni passaggio del ciclo dell'agente viene reso persistente da Temporal. Se il processo si arresta in modo anomalo, la rete si interrompe o un'API va in timeout, Temporal riprova automaticamente e riprende dall'ultimo passaggio completato. Nessuna cronologia delle conversazioni viene persa e nessuna chiamata allo strumento viene ripetuta in modo errato.

Architettura

L'architettura è composta da tre parti:

  • Workflow:il ciclo agentico che orchestra la logica di esecuzione.
  • Attività:singole unità di lavoro (chiamate LLM, chiamate di strumenti) che Temporal rende durature.
  • Worker:il processo che esegue i flussi di lavoro e le attività.

In questo esempio, inserirai tutti e tre gli elementi in un unico file (durable_agent_worker.py). In un'implementazione reale, li separeresti per consentire vari vantaggi di deployment e scalabilità. Inserirai il codice che fornisce un prompt all'agente in un secondo file (start_workflow.py).

Prerequisiti

Per completare questa guida, avrai bisogno di:

  • Una chiave API Gemini. Puoi crearne uno senza costi in Google AI Studio.
  • Python versione 3.10 o successive.
  • La CLI Temporal per l'esecuzione di un server di sviluppo locale.

Configurazione

Prima di iniziare, assicurati di avere un server di sviluppo temporaneo in esecuzione localmente:

temporal server start-dev

Poi, installa le dipendenze richieste:

pip install temporalio google-genai httpx pydantic python-dotenv

Crea un file .env nella directory del progetto con la chiave API Gemini. Puoi ottenere una chiave API da Google AI Studio.

echo "GOOGLE_API_KEY=your-api-key-here" > .env

Implementazione

Il resto di questo tutorial illustra durable_agent_worker.py dall'alto verso il basso, costruendo l'agente pezzo per pezzo. Crea il file e segui le istruzioni.

Importazioni e configurazione sandbox

Inizia con le importazioni che devono essere definite in anticipo. Il blocco workflow.unsafe.imports_passed_through() indica al sandbox del flusso di lavoro di Temporal di consentire il passaggio di determinati moduli senza restrizioni. Ciò è necessario perché diverse librerie (in particolare httpx, che è una sottoclasse di urllib.request.Request) utilizzano pattern che altrimenti la sandbox bloccherebbe.

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    import pydantic_core  # noqa: F401
    import annotated_types  # noqa: F401

    import httpx
    from pydantic import BaseModel, Field
    from google import genai
    from google.genai import types

Istruzioni di sistema

Poi, definisci la personalità dell'agente. Le istruzioni di sistema indicano al modello come comportarsi. Questo agente ha l'istruzione di rispondere in haiku quando non sono necessari strumenti.

SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""

Definizioni degli strumenti

Ora definisci gli strumenti che l'agente può utilizzare. Ogni strumento è una funzione asincrona con una docstring descrittiva. Gli strumenti che accettano parametri utilizzano un modello Pydantic come unico argomento. Si tratta di una best practice di Temporal che mantiene stabili le firme delle attività man mano che aggiungi campi facoltativi nel tempo.

import json

NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"


class GetWeatherAlertsRequest(BaseModel):
    """Request model for getting weather alerts."""

    state: str = Field(description="Two-letter US state code (e.g. CA, NY)")


async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
    """Get weather alerts for a US state.

    Args:
        request: The request object containing:
            - state: Two-letter US state code (e.g. CA, NY)
    """
    headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
    url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"

    async with httpx.AsyncClient() as client:
        response = await client.get(url, headers=headers, timeout=5.0)
        response.raise_for_status()
        return json.dumps(response.json())

Successivamente, definisci gli strumenti per la geolocalizzazione degli indirizzi IP:

class GetLocationRequest(BaseModel):
    """Request model for getting location info from an IP address."""

    ipaddress: str = Field(description="An IP address")


async def get_ip_address() -> str:
    """Get the public IP address of the current machine."""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://icanhazip.com")
        response.raise_for_status()
        return response.text.strip()


async def get_location_info(request: GetLocationRequest) -> str:
    """Get the location information for an IP address including city, state, and country.

    Args:
        request: The request object containing:
            - ipaddress: An IP address to look up
    """
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
        response.raise_for_status()
        result = response.json()
        return f"{result['city']}, {result['regionName']}, {result['country']}"

Registro degli strumenti

Successivamente, crea un registro che mappi i nomi degli strumenti alle funzioni gestore. La funzione get_tools() genera oggetti FunctionDeclaration compatibili con Gemini dai callable utilizzando FunctionDeclaration.from_callable_with_api_option().

from typing import Any, Awaitable, Callable

ToolHandler = Callable[..., Awaitable[Any]]


def get_handler(tool_name: str) -> ToolHandler:
    """Get the handler function for a given tool name."""
    if tool_name == "get_location_info":
        return get_location_info
    if tool_name == "get_ip_address":
        return get_ip_address
    if tool_name == "get_weather_alerts":
        return get_weather_alerts
    raise ValueError(f"Unknown tool name: {tool_name}")


def get_tools() -> types.Tool:
    """Get the Tool object containing all available function declarations.

    Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
    to generate tool definitions from the handler functions.
    """
    return types.Tool(
        function_declarations=[
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_weather_alerts, api_option="GEMINI_API"
            ),
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_location_info, api_option="GEMINI_API"
            ),
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_ip_address, api_option="GEMINI_API"
            ),
        ]
    )

Attività LLM

Ora definisci l'attività che chiama l'API Gemini. Le classi di dati GeminiChatRequest e GeminiChatResponse definiscono il contratto.

Disattiverai la chiamata di funzione automatica in modo che l'invocazione dell'LLM e l'invocazione dello strumento vengano gestite come attività separate, aumentando la durata dell'agente. Disabiliterai anche i tentativi integrati dell'SDK (attempts=1) poiché Temporal gestisce i tentativi in modo duraturo.

import os
from dataclasses import dataclass

from temporalio import activity

@dataclass
class GeminiChatRequest:
    """Request parameters for a Gemini chat completion."""

    model: str
    system_instruction: str
    contents: list[types.Content]
    tools: list[types.Tool]


@dataclass
class GeminiChatResponse:
    """Response from a Gemini chat completion."""

    text: str | None
    function_calls: list[dict[str, Any]]
    raw_parts: list[types.Part]


@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
    """Execute a Gemini chat completion with tool support."""
    api_key = os.environ.get("GOOGLE_API_KEY")
    if not api_key:
        raise ValueError("GOOGLE_API_KEY environment variable is not set")
    client = genai.Client(
        api_key=api_key,
        http_options=types.HttpOptions(
            retry_options=types.HttpRetryOptions(attempts=1),
        ),
    )

    config = types.GenerateContentConfig(
        system_instruction=request.system_instruction,
        tools=request.tools,
        automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
    )

    response = await client.aio.models.generate_content(
        model=request.model,
        contents=request.contents,
        config=config,
    )

    function_calls = []
    raw_parts = []
    text_parts = []

    if response.candidates and response.candidates[0].content:
        for part in response.candidates[0].content.parts:
            raw_parts.append(part)
            if part.function_call:
                function_calls.append(
                    {
                        "name": part.function_call.name,
                        "args": dict(part.function_call.args) if part.function_call.args else {},
                    }
                )
            elif part.text:
                text_parts.append(part.text)

    text = "".join(text_parts) if text_parts and not function_calls else None

    return GeminiChatResponse(
        text=text,
        function_calls=function_calls,
        raw_parts=raw_parts,
    )

Attività dello strumento dinamico

Successivamente, definisci l'attività che esegue gli strumenti. Utilizza la funzionalità di attività dinamica di Temporal: il gestore degli strumenti (un callable) viene ottenuto dal registro degli strumenti tramite la funzione get_handler. Ciò consente di definire agenti diversi semplicemente fornendo un insieme diverso di strumenti e istruzioni di sistema; il flusso di lavoro che implementa il ciclo dell'agente non richiede modifiche.

L'attività esamina la firma del gestore per determinare come passare gli argomenti. Se il gestore prevede un modello Pydantic, gestisce il formato di output nidificato prodotto da Gemini (ad esempio, {"request": {"state": "CA"}} anziché un {"state": "CA"} semplice).

import inspect
from collections.abc import Sequence

from temporalio.common import RawValue

@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
    """Execute a tool dynamically based on the activity name."""
    tool_name = activity.info().activity_type
    tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
    activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")

    handler = get_handler(tool_name)

    if not inspect.iscoroutinefunction(handler):
        raise TypeError("Tool handler must be async (awaitable).")

    sig = inspect.signature(handler)
    params = list(sig.parameters.values())

    if len(params) == 0:
        result = await handler()
    else:
        param = params[0]
        param_name = param.name
        ann = param.annotation

        if isinstance(ann, type) and issubclass(ann, BaseModel):
            nested_args = tool_args.get(param_name, tool_args)
            result = await handler(ann(**nested_args))
        else:
            result = await handler(**tool_args)

    activity.logger.info(f"Tool '{tool_name}' result: {result}")
    return result

Il workflow del ciclo agentico

Ora hai tutti i pezzi per completare la creazione dell'agente. La classe AgentWorkflow implementa un workflow contenente il ciclo dell'agente. All'interno di questo ciclo, l'LLM viene richiamato tramite l'attività (rendendola durevole), l'output viene ispezionato e, se uno strumento è stato scelto dall'LLM, viene richiamato tramite dynamic_tool_activity.

In questo semplice agente in stile ReAct, una volta che l'LLM sceglie di non utilizzare uno strumento, il ciclo viene considerato completato e viene restituito il risultato finale dell'LLM.

from datetime import timedelta

@workflow.defn
class AgentWorkflow:
    """Agentic loop workflow that uses Gemini for LLM calls and executes tools."""

    @workflow.run
    async def run(self, input: str) -> str:
        contents: list[types.Content] = [
            types.Content(role="user", parts=[types.Part.from_text(text=input)])
        ]

        tools = [get_tools()]

        while True:
            result = await workflow.execute_activity(
                generate_content,
                GeminiChatRequest(
                    model="gemini-3-flash-preview",
                    system_instruction=SYSTEM_INSTRUCTIONS,
                    contents=contents,
                    tools=tools,
                ),
                start_to_close_timeout=timedelta(seconds=60),
            )

            if result.function_calls:
                # Sending the complete raw_parts here ensures Gemini 3 thought
                # signatures are propagated correctly.
                contents.append(types.Content(role="model", parts=result.raw_parts))

                for function_call in result.function_calls:
                    tool_result = await self._handle_function_call(function_call)

                    contents.append(
                        types.Content(
                            role="user",
                            parts=[
                                types.Part.from_function_response(
                                    name=function_call["name"],
                                    response={"result": tool_result},
                                )
                            ],
                        )
                    )
            else:
                return result.text
            # Leave this in place. You will un-comment it during a durability
            # test later on.
            # await asyncio.sleep(10)

    async def _handle_function_call(self, function_call: dict) -> str:
        """Execute a tool via dynamic activity and return the result."""
        tool_name = function_call["name"]
        tool_args = function_call.get("args", {})

        result = await workflow.execute_activity(
            tool_name,
            tool_args,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return result

Il ciclo di autonomia è completamente duraturo. Se il worker dell'agente si arresta in modo anomalo dopo diverse iterazioni nel ciclo, Temporal riprenderà esattamente da dove si era interrotto senza la necessità di richiamare le invocazioni LLM o le chiamate di strumenti già eseguite.

Avvio del worker

Infine, collega tutto. Sebbene il codice implementi la logica di business necessaria in modo da sembrare in esecuzione in un unico processo, l'utilizzo di Temporal lo rende un sistema basato sugli eventi (in particolare, basato sull'origine degli eventi) in cui la comunicazione tra il flusso di lavoro e le attività avviene tramite la messaggistica fornita da Temporal.

Il worker Temporal si connette al servizio Temporal e funge da scheduler per le attività di workflow. Il lavoratore registra il flusso di lavoro e entrambe le attività, quindi inizia ad ascoltare le attività.

import asyncio
from concurrent.futures import ThreadPoolExecutor

from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

async def main():
    config = ClientConfig.load_client_connect_config()
    config.setdefault("target_host", "localhost:7233")
    client = await Client.connect(
        **config,
        data_converter=pydantic_data_converter,
    )

    worker = Worker(
        client,
        task_queue="gemini-agent-python-task-queue",
        workflows=[
            AgentWorkflow,
        ],
        activities=[
            generate_content,
            dynamic_tool_activity,
        ],
        activity_executor=ThreadPoolExecutor(max_workers=10),
    )
    await worker.run()


if __name__ == "__main__":
    load_dotenv()
    asyncio.run(main())

Lo script client

Crea lo script client (start_workflow.py). Invia una query e attende il risultato. Nota che si connette alla stessa coda di attività a cui fa riferimento il worker dell'agente: lo script start_workflow invia un'attività del flusso di lavoro con il prompt dell'utente a quella coda di attività, avviando l'esecuzione dell'agente.

import asyncio
import sys
import uuid

from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter


async def main():
    client = await Client.connect(
        "localhost:7233",
        data_converter=pydantic_data_converter,
    )

    query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"

    result = await client.execute_workflow(
        "AgentWorkflow",
        query,
        id=f"gemini-agent-id-{uuid.uuid4()}",
        task_queue="gemini-agent-python-task-queue",
    )
    print(f"\nResult:\n{result}")


if __name__ == "__main__":
    asyncio.run(main())

Esegui l'agente

Se non l'hai ancora fatto, avvia il server di sviluppo Temporal:

temporal server start-dev

In una nuova finestra del terminale, avvia il worker dell'agente:

python -m durable_agent_worker

In una terza finestra del terminale, invia una query all'agente:

python -m start_workflow "are there any weather alerts for where I am?"

Nota l'output nel terminale di durable_agent_worker che mostra le azioni che si verificano in ogni iterazione del ciclo dell'agente. L'LLM è in grado di soddisfare la richiesta dell'utente richiamando una serie di strumenti a sua disposizione. Puoi visualizzare i passaggi eseguiti tramite la UI di Temporal all'indirizzo http://localhost:8233/namespaces/default/workflows.

Prova alcuni prompt diversi per visualizzare il motivo dell'agente e gli strumenti di chiamata:

python -m start_workflow "are there any weather alerts for New York?"
python -m start_workflow "where am I?"
python -m start_workflow "what is my ip address?"
python -m start_workflow "tell me a joke"

L'ultimo prompt non richiede strumenti, quindi l'agente risponde con un haiku basato su SYSTEM_INSTRUCTIONS.

Test di durata (facoltativo)

La creazione basata su Temporal garantisce che l'agente sopravviva senza problemi ai guasti. Puoi testare questa soluzione utilizzando due esperimenti distinti.

Simulazione di un'interruzione di rete

In questo test, disattiverai temporaneamente la connessione a internet del computer, invierai un flusso di lavoro, osserverai Temporal riprovare automaticamente e poi ripristinerai la rete per vedere il recupero.

  1. Scollega la macchina da internet (ad esempio, disattiva il Wi-Fi).
  2. Invia un workflow:

    python -m start_workflow "tell me a joke"
  3. Controlla la UI di Temporal (http://localhost:8233). Vedrai l'attività LLM non riuscita e Temporal che gestisce automaticamente i tentativi nel background.

  4. Riconnettiti a internet.

  5. Il successivo nuovo tentativo automatico raggiungerà correttamente l'API Gemini e il terminale stamperà il risultato finale.

Superare l'arresto anomalo di un worker

In questo test, interrompi l'esecuzione del worker e lo riavvii. Le repliche temporali mostrano la cronologia del flusso di lavoro (event sourcing) e riprendono dall'ultima attività completata. Le chiamate LLM e le chiamate di strumenti già completate non vengono ripetute.

  1. Per darti il tempo di interrompere il worker, apri durable_agent_worker.py e rimuovi temporaneamente il commento da await asyncio.sleep(10) all'interno del ciclo AgentWorkflow run.
  2. Riavvia il worker:

    python -m durable_agent_worker
  3. Invia una query che attiva diversi strumenti:

    python -m start_workflow "are there any weather alerts where I am?"
  4. Termina il processo worker in qualsiasi momento prima del completamento (Ctrl-C nel terminale worker o utilizzando kill %1 se viene eseguito in background).

  5. Riavvia il worker:

    python -m durable_agent_worker

Temporal riproduce la cronologia del workflow. Le chiamate LLM e le invocazioni di strumenti già completate non vengono eseguite di nuovo: i risultati vengono riprodotti istantaneamente dalla cronologia (il log eventi). Il workflow viene completato correttamente.

Ulteriori risorse