In dieser Anleitung wird beschrieben, wie Sie einen ReAct-ähnlichen Agent-Loop erstellen, der die Gemini API für das Reasoning und Temporal für die Dauerhaftigkeit verwendet. Der vollständige Quellcode für diese Anleitung ist auf GitHub verfügbar.
Der Agent kann Tools aufrufen, z. B. um Wetterwarnungen zu suchen oder eine IP-Adresse zu geolokalisieren, und wird so lange wiederholt, bis er genügend Informationen für eine Antwort hat.
Der Unterschied zu einer typischen Agent-Demo besteht in der Langlebigkeit. Jeder LLM-Aufruf, jeder Tool-Aufruf und jeder Schritt des Agent-Loops wird von Temporal gespeichert. Wenn der Prozess abstürzt, das Netzwerk ausfällt oder ein API-Aufruf das Zeitlimit überschreitet, wird der Vorgang von Temporal automatisch wiederholt und ab dem letzten abgeschlossenen Schritt fortgesetzt. Es geht kein Unterhaltungsverlauf verloren und Toolaufrufe werden nicht fälschlicherweise wiederholt.
Architektur
Die Architektur besteht aus drei Teilen:
- Workflow:Der Agent-Loop, der die Ausführungslogik orchestriert.
- Aktivitäten:Einzelne Arbeitseinheiten (LLM-Aufrufe, Tool-Aufrufe), die von Temporal dauerhaft gemacht werden.
- Worker:Der Prozess, der die Workflows und Aktivitäten ausführt.
In diesem Beispiel werden alle drei Teile in einer einzigen Datei (durable_agent_worker.py) platziert. In einer realen Implementierung würden Sie sie trennen, um verschiedene Vorteile bei der Bereitstellung und Skalierbarkeit zu erzielen. Sie platzieren den Code, der dem Agent einen Prompt liefert, in einer zweiten Datei (start_workflow.py).
Vorbereitung
Für diese Anleitung benötigen Sie Folgendes:
- Ein Gemini API-Schlüssel Sie können kostenlos einen in Google AI Studio erstellen.
- Python-Version 3.10 oder höher.
- Die Temporal-CLI zum Ausführen eines lokalen Entwicklungsservers.
Einrichtung
Prüfen Sie zuerst, ob ein Temporal-Entwicklungsserver lokal ausgeführt wird:
temporal server start-devInstallieren Sie als Nächstes die erforderlichen Abhängigkeiten:
pip install temporalio google-genai httpx pydantic python-dotenvErstellen Sie in Ihrem Projektverzeichnis eine Datei mit dem Namen .env mit Ihrem Gemini API-Schlüssel. Sie können einen API-Schlüssel von Google AI Studio abrufen.
echo "GOOGLE_API_KEY=your-api-key-here" > .envImplementierung
Im weiteren Verlauf dieser Anleitung wird durable_agent_worker.py von oben nach unten durchgegangen und der Agent wird Schritt für Schritt erstellt. Erstellen Sie die Datei und folgen Sie der Anleitung.
Importe und Sandbox-Einrichtung
Beginnen Sie mit den Importen, die im Voraus definiert werden müssen. Der workflow.unsafe.imports_passed_through()-Block weist die Workflow-Sandbox von Temporal an, bestimmte Module ohne Einschränkung durchzulassen. Das ist erforderlich, da in mehreren Bibliotheken (insbesondere httpx, die eine Unterklasse von urllib.request.Request ist) Muster verwendet werden, die von der Sandbox andernfalls blockiert würden.
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
Systemanweisungen
Als Nächstes definieren Sie die Persönlichkeit des Agenten. Die Systemanweisungen geben an, wie sich das Modell verhalten soll. Dieser Agent ist so programmiert, dass er in Haikus antwortet, wenn keine Tools erforderlich sind.
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
Tooldefinitionen
Definieren Sie nun die Tools, die der Agent verwenden kann. Jedes Tool ist eine asynchrone Funktion mit einem beschreibenden Docstring. Tools, die Parameter verwenden, haben ein Pydantic-Modell als einziges Argument. Dies ist eine Temporal-Best Practice, die dafür sorgt, dass Aktivitätssignaturen stabil bleiben, wenn Sie im Laufe der Zeit optionale Felder hinzufügen.
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
Definieren Sie als Nächstes Tools für die Standortbestimmung von IP-Adressen:
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
Tool-Registry
Erstellen Sie als Nächstes eine Registry, in der Toolnamen Handler-Funktionen zugeordnet werden. Die Funktion get_tools() generiert Gemini-kompatible FunctionDeclaration-Objekte aus den aufrufbaren Elementen mit FunctionDeclaration.from_callable_with_api_option().
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
LLM-Aktivität
Definieren Sie nun die Aktivität, die die Gemini API aufruft. Die Dataclasses GeminiChatRequest und GeminiChatResponse definieren den Vertrag.
Sie deaktivieren den automatischen Funktionsaufruf, damit der LLM-Aufruf und der Tool-Aufruf als separate Aufgaben behandelt werden. So wird Ihr Agent robuster. Außerdem deaktivieren Sie die integrierten Wiederholungsversuche des SDKs (attempts=1), da Temporal Wiederholungsversuche dauerhaft verarbeitet.
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
Dynamische Tool-Aktivität
Als Nächstes definieren Sie die Aktivität, mit der Tools ausgeführt werden. Dazu wird die dynamische Aktivitätsfunktion von Temporal verwendet: Der Tool-Handler (ein Callable) wird über die Funktion get_handler aus der Tool-Registrierung abgerufen. So können verschiedene Agents definiert werden, indem einfach ein anderes Set von Tools und Systemanweisungen bereitgestellt wird. Der Workflow, der die Agent-Schleife implementiert, muss nicht geändert werden.
Bei der Aktivität wird die Signatur des Handlers geprüft, um festzustellen, wie Argumente übergeben werden. Wenn der Handler ein Pydantic-Modell erwartet, verarbeitet er das von Gemini erstellte verschachtelte Ausgabeformat (z. B. {"request": {"state": "CA"}} anstelle eines flachen {"state": "CA"}).
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
Der Workflow des agentischen Loops
Jetzt haben Sie alle Teile, um den Agent fertigzustellen. Die Klasse AgentWorkflow implementiert einen Workflow, der den Agent Loop enthält. In dieser Schleife wird das LLM über eine Aktivität aufgerufen (wodurch es dauerhaft wird), die Ausgabe wird geprüft und wenn ein Tool vom LLM ausgewählt wurde, wird es über dynamic_tool_activity aufgerufen.
In diesem einfachen ReAct-Agenten gilt die Schleife als abgeschlossen und das endgültige LLM-Ergebnis wird zurückgegeben, sobald das LLM sich entscheidet, kein Tool zu verwenden.
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3-flash-preview",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
Der Agentenkreislauf ist vollständig robust. Wenn der Agent-Worker nach mehreren Schleifendurchläufen abstürzt, setzt Temporal genau dort fort, wo er aufgehört hat. Bereits ausgeführte LLM-Aufrufe oder Tool-Aufrufe müssen nicht noch einmal aufgerufen werden.
Worker-Start
Verkabeln Sie schließlich alles. Der Code implementiert die erforderliche Geschäftslogik so, dass es aussieht, als würde er in einem einzigen Prozess ausgeführt. Durch die Verwendung von Temporal wird er jedoch zu einem ereignisgesteuerten System (insbesondere einem Event-Sourcing-System), in dem die Kommunikation zwischen dem Workflow und den Aktivitäten über Messaging erfolgt, das von Temporal bereitgestellt wird.
Der Temporal-Worker stellt eine Verbindung zum Temporal-Dienst her und fungiert als Scheduler für die Workflow- und Aktivitätsaufgaben. Der Worker registriert den Workflow und beide Aktivitäten und beginnt dann, auf Aufgaben zu warten.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
Das Client-Script
Erstellen Sie das Client-Skript (start_workflow.py). Es sendet eine Anfrage und wartet auf das Ergebnis. Sie sehen, dass sie mit derselben Aufgabenwarteschlange verbunden ist, auf die im Agent-Worker verwiesen wird. Das start_workflow-Skript sendet eine Workflow-Aufgabe mit dem Nutzer-Prompt an diese Aufgabenwarteschlange und startet so die Ausführung des Agents.
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
Agent ausführen
Wenn Sie es noch nicht getan haben, starten Sie den Temporal-Entwicklungsserver:
temporal server start-devStarten Sie in einem neuen Terminalfenster den Agent-Worker:
python -m durable_agent_workerSenden Sie in einem dritten Terminalfenster eine Anfrage an Ihren Agenten:
python -m start_workflow "are there any weather alerts for where I am?"Sehen Sie sich die Ausgabe im Terminal von durable_agent_worker an. Dort werden die Aktionen angezeigt, die in jeder Iteration des Agent-Loops ausgeführt werden. Das LLM kann die Nutzeranfrage erfüllen, indem es eine Reihe von Tools aufruft, die ihm zur Verfügung stehen. Die ausgeführten Schritte können Sie in der Temporal-UI unter http://localhost:8233/namespaces/default/workflows einsehen.
Probieren Sie einige verschiedene Prompts aus, um den Grund des Kundenservicemitarbeiters und die Anruftools zu sehen:
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
Für den letzten Prompt sind keine Tools erforderlich. Der Agent antwortet daher mit einem Haiku, das auf SYSTEM_INSTRUCTIONS basiert.
Haltbarkeit testen (optional)
Wenn Sie auf Temporal aufbauen, übersteht Ihr Agent Fehler nahtlos. Sie können dies mit zwei separaten Tests testen.
Netzwerkausfall simulieren
Bei diesem Test deaktivieren Sie vorübergehend die Internetverbindung Ihres Computers, senden einen Workflow, beobachten, wie Temporal es automatisch noch einmal versucht, und stellen dann das Netzwerk wieder her, um zu sehen, wie es sich erholt.
- Trennen Sie Ihren Computer vom Internet (z. B. indem Sie das WLAN deaktivieren).
Workflow einreichen:
python -m start_workflow "tell me a joke"Sehen Sie sich die Temporal-Benutzeroberfläche (
http://localhost:8233) an. Sie sehen, dass die LLM-Aktivität fehlschlägt und Temporal die Wiederholungsversuche automatisch im Hintergrund verwaltet.Stellen Sie eine Verbindung zum Internet her.
Beim nächsten automatischen Wiederholungsversuch wird die Gemini API erfolgreich erreicht und das Endergebnis wird in Ihrem Terminal ausgegeben.
Worker-Absturz überleben
Bei diesem Test beenden Sie den Worker während der Ausführung und starten ihn neu. Bei temporären Wiederholungen wird der Workflowverlauf (Event Sourcing) wiedergegeben und die Ausführung wird bei der letzten abgeschlossenen Aktivität fortgesetzt. Bereits abgeschlossene LLM-Aufrufe und Tool-Aufrufe werden nicht wiederholt.
- Wenn Sie sich Zeit geben möchten, den Worker zu beenden, öffnen Sie
durable_agent_worker.pyund heben Sie vorübergehend die Auskommentierung vonawait asyncio.sleep(10)in derAgentWorkflow-run-Schleife auf. Worker neu starten:
python -m durable_agent_workerEine Anfrage senden, die mehrere Tools auslöst:
python -m start_workflow "are there any weather alerts where I am?"Beenden Sie den Worker-Prozess jederzeit vor Abschluss (
Ctrl-Cim Worker-Terminal oder mitkill %1, wenn er im Hintergrund ausgeführt wird).Worker neu starten:
python -m durable_agent_worker
Temporal spielt den Workflow-Verlauf noch einmal ab. LLM-Aufrufe und Tool-Aufrufe, die bereits abgeschlossen sind, werden nicht noch einmal ausgeführt. Ihre Ergebnisse werden sofort aus dem Verlauf (dem Ereignisprotokoll) wiedergegeben. Der Workflow wird erfolgreich abgeschlossen.