Questo tutorial ti guida nella creazione di un ciclo agentico in stile ReAct che utilizza l'API Gemini per il ragionamento e Temporal per la durabilità. Il codice sorgente completo di questo tutorial è disponibile su GitHub.
L'agente può chiamare strumenti, ad esempio cercare avvisi meteo o geolocalizzare un indirizzo IP, e continuerà a farlo finché non avrà informazioni sufficienti per rispondere.
Ciò che lo distingue da una tipica demo dell'agente è la durabilità. Ogni chiamata LLM, ogni invocazione di strumenti e ogni passaggio del ciclo dell'agente viene reso persistente da Temporal. Se il processo si arresta in modo anomalo, la rete si interrompe o un'API va in timeout, Temporal riprova automaticamente e riprende dall'ultimo passaggio completato. Nessuna cronologia delle conversazioni viene persa e nessuna chiamata allo strumento viene ripetuta in modo errato.
Architettura
L'architettura è composta da tre parti:
- Workflow:il ciclo agentico che orchestra la logica di esecuzione.
- Attività:singole unità di lavoro (chiamate LLM, chiamate di strumenti) che Temporal rende durature.
- Worker:il processo che esegue i flussi di lavoro e le attività.
In questo esempio, inserirai tutti e tre gli elementi in un unico file
(durable_agent_worker.py). In un'implementazione reale, li separeresti
per consentire vari vantaggi di deployment e scalabilità. Inserirai
il codice che fornisce un prompt all'agente in un secondo file
(start_workflow.py).
Prerequisiti
Per completare questa guida, avrai bisogno di:
- Una chiave API Gemini. Puoi crearne uno senza costi in Google AI Studio.
- Python versione 3.10 o successive.
- La CLI Temporal per l'esecuzione di un server di sviluppo locale.
Configurazione
Prima di iniziare, assicurati di avere un server di sviluppo temporaneo in esecuzione localmente:
temporal server start-devPoi, installa le dipendenze richieste:
pip install temporalio google-genai httpx pydantic python-dotenvCrea un file .env nella directory del progetto con la chiave API Gemini. Puoi
ottenere una chiave API da
Google AI Studio.
echo "GOOGLE_API_KEY=your-api-key-here" > .envImplementazione
Il resto di questo tutorial illustra durable_agent_worker.py dall'alto verso il basso, costruendo l'agente pezzo per pezzo. Crea il file e segui le istruzioni.
Importazioni e configurazione sandbox
Inizia con le importazioni che devono essere definite in anticipo. Il blocco
workflow.unsafe.imports_passed_through() indica al sandbox del flusso di lavoro di Temporal
di consentire il passaggio di determinati moduli senza restrizioni. Ciò è necessario perché diverse librerie (in particolare httpx, che è una sottoclasse di urllib.request.Request) utilizzano pattern che altrimenti la sandbox bloccherebbe.
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
Istruzioni di sistema
Poi, definisci la personalità dell'agente. Le istruzioni di sistema indicano al modello come comportarsi. Questo agente ha l'istruzione di rispondere in haiku quando non sono necessari strumenti.
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
Definizioni degli strumenti
Ora definisci gli strumenti che l'agente può utilizzare. Ogni strumento è una funzione asincrona con una docstring descrittiva. Gli strumenti che accettano parametri utilizzano un modello Pydantic come unico argomento. Si tratta di una best practice di Temporal che mantiene stabili le firme delle attività man mano che aggiungi campi facoltativi nel tempo.
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
Successivamente, definisci gli strumenti per la geolocalizzazione degli indirizzi IP:
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
Registro degli strumenti
Successivamente, crea un registro che mappi i nomi degli strumenti alle funzioni gestore. La funzione
get_tools() genera oggetti FunctionDeclaration compatibili con Gemini
dai callable utilizzando FunctionDeclaration.from_callable_with_api_option().
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
Attività LLM
Ora definisci l'attività che chiama l'API Gemini. Le classi di dati GeminiChatRequest e
GeminiChatResponse definiscono il contratto.
Disattiverai la chiamata di funzione automatica in modo che l'invocazione dell'LLM e
l'invocazione dello strumento vengano gestite come attività separate, aumentando la durata
dell'agente. Disabiliterai anche i tentativi integrati dell'SDK (attempts=1) poiché
Temporal gestisce i tentativi in modo duraturo.
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
Attività dello strumento dinamico
Successivamente, definisci l'attività che esegue gli strumenti. Utilizza la funzionalità di attività dinamica di Temporal: il gestore degli strumenti (un callable) viene ottenuto dal registro degli strumenti tramite la funzione get_handler. Ciò consente di definire agenti diversi semplicemente fornendo un insieme diverso di strumenti e istruzioni di sistema; il flusso di lavoro che implementa il ciclo dell'agente non richiede modifiche.
L'attività esamina la firma del gestore per determinare come passare
gli argomenti. Se il gestore prevede un modello Pydantic, gestisce il formato di output nidificato
prodotto da Gemini (ad esempio, {"request": {"state": "CA"}} anziché
un {"state": "CA"} semplice).
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
Il workflow del ciclo agentico
Ora hai tutti i pezzi per completare la creazione dell'agente. La classe AgentWorkflow
implementa un workflow contenente il ciclo dell'agente. All'interno di questo ciclo, l'LLM
viene richiamato tramite l'attività (rendendola durevole), l'output viene ispezionato e, se uno
strumento è stato scelto dall'LLM, viene richiamato tramite dynamic_tool_activity.
In questo semplice agente in stile ReAct, una volta che l'LLM sceglie di non utilizzare uno strumento, il ciclo viene considerato completato e viene restituito il risultato finale dell'LLM.
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3-flash-preview",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
Il ciclo di autonomia è completamente duraturo. Se il worker dell'agente si arresta in modo anomalo dopo diverse iterazioni nel ciclo, Temporal riprenderà esattamente da dove si era interrotto senza la necessità di richiamare le invocazioni LLM o le chiamate di strumenti già eseguite.
Avvio del worker
Infine, collega tutto. Sebbene il codice implementi la logica di business necessaria in modo da sembrare in esecuzione in un unico processo, l'utilizzo di Temporal lo rende un sistema basato sugli eventi (in particolare, basato sull'origine degli eventi) in cui la comunicazione tra il flusso di lavoro e le attività avviene tramite la messaggistica fornita da Temporal.
Il worker Temporal si connette al servizio Temporal e funge da scheduler per le attività di workflow. Il lavoratore registra il flusso di lavoro e entrambe le attività, quindi inizia ad ascoltare le attività.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
Lo script client
Crea lo script client (start_workflow.py). Invia una query e attende
il risultato. Nota che si connette alla stessa coda di attività a cui fa riferimento il worker dell'agente: lo script start_workflow invia un'attività del flusso di lavoro con il prompt dell'utente a quella coda di attività, avviando l'esecuzione dell'agente.
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
Esegui l'agente
Se non l'hai ancora fatto, avvia il server di sviluppo Temporal:
temporal server start-devIn una nuova finestra del terminale, avvia il worker dell'agente:
python -m durable_agent_workerIn una terza finestra del terminale, invia una query all'agente:
python -m start_workflow "are there any weather alerts for where I am?"Nota l'output nel terminale di durable_agent_worker che mostra le
azioni che si verificano in ogni iterazione del ciclo dell'agente. L'LLM è in grado di soddisfare la richiesta dell'utente richiamando una serie di strumenti a sua disposizione. Puoi
visualizzare i passaggi eseguiti tramite la UI di Temporal all'indirizzo
http://localhost:8233/namespaces/default/workflows.
Prova alcuni prompt diversi per visualizzare il motivo dell'agente e gli strumenti di chiamata:
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
L'ultimo prompt non richiede strumenti, quindi l'agente risponde con un haiku
basato su SYSTEM_INSTRUCTIONS.
Test di durata (facoltativo)
La creazione basata su Temporal garantisce che l'agente sopravviva senza problemi ai guasti. Puoi testare questa soluzione utilizzando due esperimenti distinti.
Simulazione di un'interruzione di rete
In questo test, disattiverai temporaneamente la connessione a internet del computer, invierai un flusso di lavoro, osserverai Temporal riprovare automaticamente e poi ripristinerai la rete per vedere il recupero.
- Scollega la macchina da internet (ad esempio, disattiva il Wi-Fi).
Invia un workflow:
python -m start_workflow "tell me a joke"Controlla la UI di Temporal (
http://localhost:8233). Vedrai l'attività LLM non riuscita e Temporal che gestisce automaticamente i tentativi nel background.Riconnettiti a internet.
Il successivo nuovo tentativo automatico raggiungerà correttamente l'API Gemini e il terminale stamperà il risultato finale.
Superare l'arresto anomalo di un worker
In questo test, interrompi l'esecuzione del worker e lo riavvii. Le repliche temporali mostrano la cronologia del flusso di lavoro (event sourcing) e riprendono dall'ultima attività completata. Le chiamate LLM e le chiamate di strumenti già completate non vengono ripetute.
- Per darti il tempo di interrompere il worker, apri
durable_agent_worker.pye rimuovi temporaneamente il commento daawait asyncio.sleep(10)all'interno del cicloAgentWorkflowrun. Riavvia il worker:
python -m durable_agent_workerInvia una query che attiva diversi strumenti:
python -m start_workflow "are there any weather alerts where I am?"Termina il processo worker in qualsiasi momento prima del completamento (
Ctrl-Cnel terminale worker o utilizzandokill %1se viene eseguito in background).Riavvia il worker:
python -m durable_agent_worker
Temporal riproduce la cronologia del workflow. Le chiamate LLM e le invocazioni di strumenti già completate non vengono eseguite di nuovo: i risultati vengono riprodotti istantaneamente dalla cronologia (il log eventi). Il workflow viene completato correttamente.