Ce tutoriel vous explique comment créer une boucle agentique de style ReAct qui utilise l'API Gemini pour le raisonnement et Temporal pour la durabilité. Le code source complet de ce tutoriel est disponible sur GitHub.
L'agent peut appeler des outils, comme rechercher des alertes météo ou géolocaliser une adresse IP, et il bouclera jusqu'à ce qu'il dispose de suffisamment d'informations pour répondre.
Ce qui différencie cette démo d'agent d'une démo classique, c'est la durabilité. Chaque appel de LLM, chaque invocation d'outil et chaque étape de la boucle agentique sont conservés par Temporal. Si le processus plante, que le réseau tombe en panne ou qu'une API expire, Temporal effectue automatiquement de nouvelles tentatives et reprend le processus à la dernière étape terminée. Aucun historique de conversation n'est perdu et aucun appel d'outil n'est répété de manière incorrecte.
Architecture
L'architecture se compose de trois parties :
- Workflow : boucle agentive qui orchestre la logique d'exécution.
- Activités : unités de travail individuelles (appels LLM, appels d'outils) que Temporal rend durables.
- Nœud de calcul : processus qui exécute les workflows et les activités.
Dans cet exemple, vous allez placer ces trois éléments dans un seul fichier (durable_agent_worker.py). Dans une implémentation réelle, vous les sépareriez pour profiter de divers avantages en termes de déploiement et d'évolutivité. Vous placerez le code qui fournit un prompt à l'agent dans un deuxième fichier (start_workflow.py).
Prérequis
Pour suivre ce guide, vous aurez besoin des éléments suivants :
- Une clé API Gemini Vous pouvez en créer une sans frais dans Google AI Studio.
- Python version 3.10 ou ultérieure.
- La CLI Temporal pour exécuter un serveur de développement local.
Configuration
Avant de commencer, assurez-vous qu'un serveur de développement Temporal est en cours d'exécution en local :
temporal server start-devEnsuite, installez les dépendances requises :
pip install temporalio google-genai httpx pydantic python-dotenvCréez un fichier .env dans le répertoire de votre projet avec votre clé API Gemini. Vous pouvez obtenir une clé API depuis Google AI Studio.
echo "GOOGLE_API_KEY=your-api-key-here" > .envImplémentation
Le reste de ce tutoriel vous guide à travers durable_agent_worker.py de haut en bas, en créant l'agent pièce par pièce. Créez le fichier et suivez les instructions.
Importations et configuration du bac à sable
Commencez par les importations qui doivent être définies au préalable. Le bloc workflow.unsafe.imports_passed_through() indique au bac à sable de workflow de Temporal de laisser passer certains modules sans restriction. Cela est nécessaire, car plusieurs bibliothèques (notamment httpx, qui est une sous-classe de urllib.request.Request) utilisent des modèles que le bac à sable bloquerait autrement.
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
Instructions système
Ensuite, définissez la personnalité de l'agent. Les instructions système indiquent au modèle comment se comporter. Cet agent est chargé de répondre sous forme de haïkus lorsqu'aucun outil n'est nécessaire.
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
Définitions d'outils
Définissez maintenant les outils que l'agent peut utiliser. Chaque outil est une fonction asynchrone avec une chaîne de documentation descriptive. Les outils qui acceptent des paramètres utilisent un modèle Pydantic comme argument unique. Il s'agit d'une bonne pratique de Temporal qui permet de conserver des signatures d'activité stables lorsque vous ajoutez des champs facultatifs au fil du temps.
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
Définissez ensuite des outils pour la géolocalisation des adresses IP :
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
Registre d'outils
Ensuite, créez un registre qui mappe les noms d'outils aux fonctions de gestionnaire. La fonction get_tools() génère des objets FunctionDeclaration compatibles avec Gemini à partir des éléments appelables à l'aide de FunctionDeclaration.from_callable_with_api_option().
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
Activité LLM
Définissez maintenant l'activité qui appelle l'API Gemini. Les classes de données GeminiChatRequest et GeminiChatResponse définissent le contrat.
Vous allez désactiver l'appel de fonction automatique afin que l'appel LLM et l'appel d'outil soient traités comme des tâches distinctes, ce qui rendra votre agent plus durable. Vous allez également désactiver les tentatives intégrées du SDK (attempts=1), car Temporal gère les tentatives de manière durable.
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
Activité de l'outil dynamique
Ensuite, définissez l'activité qui exécute les outils. Pour ce faire, nous utilisons la fonctionnalité d'activité dynamique de Temporal : le gestionnaire d'outils (un appelable) est obtenu à partir du registre d'outils via la fonction get_handler. Cela permet de définir différents agents simplement en fournissant un ensemble différent d'outils et d'instructions système. Le workflow implémentant la boucle agentique ne nécessite aucune modification.
L'activité inspecte la signature du gestionnaire pour déterminer comment transmettre les arguments. Si le gestionnaire attend un modèle Pydantic, il gère le format de sortie imbriqué produit par Gemini (par exemple, {"request": {"state": "CA"}} au lieu d'un {"state": "CA"} plat).
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
Workflow de boucle agentive
Vous disposez désormais de tous les éléments nécessaires pour terminer de créer l'agent. La classe AgentWorkflow implémente un workflow contenant la boucle de l'agent. Dans cette boucle, le LLM est appelé via une activité (ce qui le rend durable), la sortie est inspectée et, si un outil a été choisi par le LLM, il est appelé via dynamic_tool_activity.
Dans cet agent de style ReAct simple, une fois que le LLM choisit de ne pas utiliser d'outil, la boucle est considérée comme terminée et le résultat final du LLM est renvoyé.
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3-flash-preview",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
La boucle agentique est entièrement durable. Si le nœud de calcul de l'agent plante après plusieurs itérations dans la boucle, Temporal reprendra exactement là où il s'était arrêté, sans avoir besoin de réinvoquer les appels LLM ou les appels d'outils déjà exécutés.
Démarrage du nœud de calcul
Enfin, connectez tous les éléments. Bien que le code implémente la logique métier nécessaire de manière à ce qu'il semble s'exécuter dans un seul processus, l'utilisation de Temporal en fait un système piloté par les événements (plus précisément, basé sur les événements) où la communication entre le workflow et les activités se fait via la messagerie fournie par Temporal.
Le nœud de calcul Temporal se connecte au service Temporal et sert de planificateur pour les tâches de workflow et d'activité. Le nœud de calcul enregistre le workflow et les deux activités, puis commence à écouter les tâches.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
Script client
Créez le script client (start_workflow.py). Il envoie une requête et attend le résultat. Notez qu'il se connecte à la même file d'attente de tâches que celle référencée dans le worker de l'agent. Le script start_workflow distribue une tâche de workflow avec la requête utilisateur à cette file d'attente de tâches, ce qui lance l'exécution de l'agent.
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
Exécuter l'agent
Si vous ne l'avez pas déjà fait, démarrez le serveur de développement Temporal :
temporal server start-devDans une nouvelle fenêtre de terminal, démarrez le nœud de calcul de l'agent :
python -m durable_agent_workerDans une troisième fenêtre de terminal, envoyez une requête à votre agent :
python -m start_workflow "are there any weather alerts for where I am?"Notez la sortie dans le terminal du durable_agent_worker qui montre les actions qui se produisent à chaque itération de la boucle agentique. Le LLM est capable de répondre à la demande de l'utilisateur en invoquant une série d'outils à sa disposition. Vous pouvez consulter les étapes exécutées dans l'interface utilisateur Temporal à l'adresse http://localhost:8233/namespaces/default/workflows.
Essayez quelques prompts différents pour voir le motif de l'agent et les outils d'appel :
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
La dernière invite ne nécessite aucun outil. L'agent répond donc par un haïku basé sur SYSTEM_INSTRUCTIONS.
Tester la durabilité (facultatif)
S'appuyer sur Temporal garantit que votre agent survit aux échecs de manière transparente. Vous pouvez tester cela à l'aide de deux tests distincts.
Simuler une panne réseau
Dans ce test, vous allez désactiver temporairement la connexion Internet de votre ordinateur, envoyer un workflow, regarder Temporal réessayer automatiquement, puis rétablir le réseau pour voir la récupération.
- Déconnectez votre ordinateur d'Internet (par exemple, désactivez le Wi-Fi).
Envoyez un workflow :
python -m start_workflow "tell me a joke"Consultez l'interface utilisateur Temporal (
http://localhost:8233). Vous verrez que l'activité LLM échoue et que Temporal gère automatiquement les nouvelles tentatives en arrière-plan.Reconnectez-vous à Internet.
La prochaine tentative automatique réussira à atteindre l'API Gemini, et votre terminal affichera le résultat final.
Survivre à un plantage de nœud de calcul
Dans ce test, vous allez arrêter le nœud de calcul en cours d'exécution et le redémarrer. Temporal relit l'historique du workflow (sourcing d'événements) et reprend la dernière activité terminée. Les appels LLM et les appels d'outils déjà effectués ne sont pas répétés.
- Pour vous donner le temps d'arrêter le worker, ouvrez
durable_agent_worker.pyet décommentez temporairementawait asyncio.sleep(10)dans la boucleAgentWorkflowrun. Redémarrez le nœud de calcul :
python -m durable_agent_workerEnvoyez une requête qui déclenche plusieurs outils :
python -m start_workflow "are there any weather alerts where I am?"Arrêtez le processus de nœud de calcul à tout moment avant la fin (
Ctrl-Cdans le terminal du nœud de calcul ou à l'aide dekill %1si vous l'exécutez en arrière-plan).Redémarrez le nœud de calcul :
python -m durable_agent_worker
Temporal relit l'historique du workflow. Les appels LLM et les invocations d'outils déjà effectués ne sont pas réexécutés. Leurs résultats sont immédiatement rejoués à partir de l'historique (le journal des événements). Le workflow se termine correctement.