Langlebiger KI-Agent mit Gemini und Temporal

In dieser Anleitung wird beschrieben, wie Sie einen ReAct-ähnlichen Agent-Loop erstellen, der die Gemini API für das Reasoning und Temporal für die Dauerhaftigkeit verwendet. Der vollständige Quellcode für diese Anleitung ist auf GitHub verfügbar.

Der Agent kann Tools aufrufen, z. B. um Wetterwarnungen zu suchen oder eine IP-Adresse zu geolokalisieren, und wird so lange wiederholt, bis er genügend Informationen für eine Antwort hat.

Der Unterschied zu einer typischen Agent-Demo besteht in der Langlebigkeit. Jeder LLM-Aufruf, jeder Tool-Aufruf und jeder Schritt des Agent-Loops wird von Temporal gespeichert. Wenn der Prozess abstürzt, das Netzwerk ausfällt oder ein API-Aufruf das Zeitlimit überschreitet, wird der Vorgang von Temporal automatisch wiederholt und ab dem letzten abgeschlossenen Schritt fortgesetzt. Es geht kein Unterhaltungsverlauf verloren und Toolaufrufe werden nicht fälschlicherweise wiederholt.

Architektur

Die Architektur besteht aus drei Teilen:

  • Workflow:Der Agent-Loop, der die Ausführungslogik orchestriert.
  • Aktivitäten:Einzelne Arbeitseinheiten (LLM-Aufrufe, Tool-Aufrufe), die von Temporal dauerhaft gemacht werden.
  • Worker:Der Prozess, der die Workflows und Aktivitäten ausführt.

In diesem Beispiel werden alle drei Teile in einer einzigen Datei (durable_agent_worker.py) platziert. In einer realen Implementierung würden Sie sie trennen, um verschiedene Vorteile bei der Bereitstellung und Skalierbarkeit zu erzielen. Sie platzieren den Code, der dem Agent einen Prompt liefert, in einer zweiten Datei (start_workflow.py).

Vorbereitung

Für diese Anleitung benötigen Sie Folgendes:

  • Ein Gemini API-Schlüssel Sie können kostenlos einen in Google AI Studio erstellen.
  • Python-Version 3.10 oder höher.
  • Die Temporal-CLI zum Ausführen eines lokalen Entwicklungsservers.

Einrichtung

Prüfen Sie zuerst, ob ein Temporal-Entwicklungsserver lokal ausgeführt wird:

temporal server start-dev

Installieren Sie als Nächstes die erforderlichen Abhängigkeiten:

pip install temporalio google-genai httpx pydantic python-dotenv

Erstellen Sie in Ihrem Projektverzeichnis eine Datei mit dem Namen .env mit Ihrem Gemini API-Schlüssel. Sie können einen API-Schlüssel von Google AI Studio abrufen.

echo "GOOGLE_API_KEY=your-api-key-here" > .env

Implementierung

Im weiteren Verlauf dieser Anleitung wird durable_agent_worker.py von oben nach unten durchgegangen und der Agent wird Schritt für Schritt erstellt. Erstellen Sie die Datei und folgen Sie der Anleitung.

Importe und Sandbox-Einrichtung

Beginnen Sie mit den Importen, die im Voraus definiert werden müssen. Der workflow.unsafe.imports_passed_through()-Block weist die Workflow-Sandbox von Temporal an, bestimmte Module ohne Einschränkung durchzulassen. Das ist erforderlich, da in mehreren Bibliotheken (insbesondere httpx, die eine Unterklasse von urllib.request.Request ist) Muster verwendet werden, die von der Sandbox andernfalls blockiert würden.

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    import pydantic_core  # noqa: F401
    import annotated_types  # noqa: F401

    import httpx
    from pydantic import BaseModel, Field
    from google import genai
    from google.genai import types

Systemanweisungen

Als Nächstes definieren Sie die Persönlichkeit des Agenten. Die Systemanweisungen geben an, wie sich das Modell verhalten soll. Dieser Agent ist so programmiert, dass er in Haikus antwortet, wenn keine Tools erforderlich sind.

SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""

Tooldefinitionen

Definieren Sie nun die Tools, die der Agent verwenden kann. Jedes Tool ist eine asynchrone Funktion mit einem beschreibenden Docstring. Tools, die Parameter verwenden, haben ein Pydantic-Modell als einziges Argument. Dies ist eine Temporal-Best Practice, die dafür sorgt, dass Aktivitätssignaturen stabil bleiben, wenn Sie im Laufe der Zeit optionale Felder hinzufügen.

import json

NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"


class GetWeatherAlertsRequest(BaseModel):
    """Request model for getting weather alerts."""

    state: str = Field(description="Two-letter US state code (e.g. CA, NY)")


async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
    """Get weather alerts for a US state.

    Args:
        request: The request object containing:
            - state: Two-letter US state code (e.g. CA, NY)
    """
    headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
    url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"

    async with httpx.AsyncClient() as client:
        response = await client.get(url, headers=headers, timeout=5.0)
        response.raise_for_status()
        return json.dumps(response.json())

Definieren Sie als Nächstes Tools für die Standortbestimmung von IP-Adressen:

class GetLocationRequest(BaseModel):
    """Request model for getting location info from an IP address."""

    ipaddress: str = Field(description="An IP address")


async def get_ip_address() -> str:
    """Get the public IP address of the current machine."""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://icanhazip.com")
        response.raise_for_status()
        return response.text.strip()


async def get_location_info(request: GetLocationRequest) -> str:
    """Get the location information for an IP address including city, state, and country.

    Args:
        request: The request object containing:
            - ipaddress: An IP address to look up
    """
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
        response.raise_for_status()
        result = response.json()
        return f"{result['city']}, {result['regionName']}, {result['country']}"

Tool-Registry

Erstellen Sie als Nächstes eine Registry, in der Toolnamen Handler-Funktionen zugeordnet werden. Die Funktion get_tools() generiert Gemini-kompatible FunctionDeclaration-Objekte aus den aufrufbaren Elementen mit FunctionDeclaration.from_callable_with_api_option().

from typing import Any, Awaitable, Callable

ToolHandler = Callable[..., Awaitable[Any]]


def get_handler(tool_name: str) -> ToolHandler:
    """Get the handler function for a given tool name."""
    if tool_name == "get_location_info":
        return get_location_info
    if tool_name == "get_ip_address":
        return get_ip_address
    if tool_name == "get_weather_alerts":
        return get_weather_alerts
    raise ValueError(f"Unknown tool name: {tool_name}")


def get_tools() -> types.Tool:
    """Get the Tool object containing all available function declarations.

    Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
    to generate tool definitions from the handler functions.
    """
    return types.Tool(
        function_declarations=[
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_weather_alerts, api_option="GEMINI_API"
            ),
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_location_info, api_option="GEMINI_API"
            ),
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_ip_address, api_option="GEMINI_API"
            ),
        ]
    )

LLM-Aktivität

Definieren Sie nun die Aktivität, die die Gemini API aufruft. Die Dataclasses GeminiChatRequest und GeminiChatResponse definieren den Vertrag.

Sie deaktivieren den automatischen Funktionsaufruf, damit der LLM-Aufruf und der Tool-Aufruf als separate Aufgaben behandelt werden. So wird Ihr Agent robuster. Außerdem deaktivieren Sie die integrierten Wiederholungsversuche des SDKs (attempts=1), da Temporal Wiederholungsversuche dauerhaft verarbeitet.

import os
from dataclasses import dataclass

from temporalio import activity

@dataclass
class GeminiChatRequest:
    """Request parameters for a Gemini chat completion."""

    model: str
    system_instruction: str
    contents: list[types.Content]
    tools: list[types.Tool]


@dataclass
class GeminiChatResponse:
    """Response from a Gemini chat completion."""

    text: str | None
    function_calls: list[dict[str, Any]]
    raw_parts: list[types.Part]


@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
    """Execute a Gemini chat completion with tool support."""
    api_key = os.environ.get("GOOGLE_API_KEY")
    if not api_key:
        raise ValueError("GOOGLE_API_KEY environment variable is not set")
    client = genai.Client(
        api_key=api_key,
        http_options=types.HttpOptions(
            retry_options=types.HttpRetryOptions(attempts=1),
        ),
    )

    config = types.GenerateContentConfig(
        system_instruction=request.system_instruction,
        tools=request.tools,
        automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
    )

    response = await client.aio.models.generate_content(
        model=request.model,
        contents=request.contents,
        config=config,
    )

    function_calls = []
    raw_parts = []
    text_parts = []

    if response.candidates and response.candidates[0].content:
        for part in response.candidates[0].content.parts:
            raw_parts.append(part)
            if part.function_call:
                function_calls.append(
                    {
                        "name": part.function_call.name,
                        "args": dict(part.function_call.args) if part.function_call.args else {},
                    }
                )
            elif part.text:
                text_parts.append(part.text)

    text = "".join(text_parts) if text_parts and not function_calls else None

    return GeminiChatResponse(
        text=text,
        function_calls=function_calls,
        raw_parts=raw_parts,
    )

Dynamische Tool-Aktivität

Als Nächstes definieren Sie die Aktivität, mit der Tools ausgeführt werden. Dazu wird die dynamische Aktivitätsfunktion von Temporal verwendet: Der Tool-Handler (ein Callable) wird über die Funktion get_handler aus der Tool-Registrierung abgerufen. So können verschiedene Agents definiert werden, indem einfach ein anderes Set von Tools und Systemanweisungen bereitgestellt wird. Der Workflow, der die Agent-Schleife implementiert, muss nicht geändert werden.

Bei der Aktivität wird die Signatur des Handlers geprüft, um festzustellen, wie Argumente übergeben werden. Wenn der Handler ein Pydantic-Modell erwartet, verarbeitet er das von Gemini erstellte verschachtelte Ausgabeformat (z. B. {"request": {"state": "CA"}} anstelle eines flachen {"state": "CA"}).

import inspect
from collections.abc import Sequence

from temporalio.common import RawValue

@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
    """Execute a tool dynamically based on the activity name."""
    tool_name = activity.info().activity_type
    tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
    activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")

    handler = get_handler(tool_name)

    if not inspect.iscoroutinefunction(handler):
        raise TypeError("Tool handler must be async (awaitable).")

    sig = inspect.signature(handler)
    params = list(sig.parameters.values())

    if len(params) == 0:
        result = await handler()
    else:
        param = params[0]
        param_name = param.name
        ann = param.annotation

        if isinstance(ann, type) and issubclass(ann, BaseModel):
            nested_args = tool_args.get(param_name, tool_args)
            result = await handler(ann(**nested_args))
        else:
            result = await handler(**tool_args)

    activity.logger.info(f"Tool '{tool_name}' result: {result}")
    return result

Der Workflow des agentischen Loops

Jetzt haben Sie alle Teile, um den Agent fertigzustellen. Die Klasse AgentWorkflow implementiert einen Workflow, der den Agent Loop enthält. In dieser Schleife wird das LLM über eine Aktivität aufgerufen (wodurch es dauerhaft wird), die Ausgabe wird geprüft und wenn ein Tool vom LLM ausgewählt wurde, wird es über dynamic_tool_activity aufgerufen.

In diesem einfachen ReAct-Agenten gilt die Schleife als abgeschlossen und das endgültige LLM-Ergebnis wird zurückgegeben, sobald das LLM sich entscheidet, kein Tool zu verwenden.

from datetime import timedelta

@workflow.defn
class AgentWorkflow:
    """Agentic loop workflow that uses Gemini for LLM calls and executes tools."""

    @workflow.run
    async def run(self, input: str) -> str:
        contents: list[types.Content] = [
            types.Content(role="user", parts=[types.Part.from_text(text=input)])
        ]

        tools = [get_tools()]

        while True:
            result = await workflow.execute_activity(
                generate_content,
                GeminiChatRequest(
                    model="gemini-3-flash-preview",
                    system_instruction=SYSTEM_INSTRUCTIONS,
                    contents=contents,
                    tools=tools,
                ),
                start_to_close_timeout=timedelta(seconds=60),
            )

            if result.function_calls:
                # Sending the complete raw_parts here ensures Gemini 3 thought
                # signatures are propagated correctly.
                contents.append(types.Content(role="model", parts=result.raw_parts))

                for function_call in result.function_calls:
                    tool_result = await self._handle_function_call(function_call)

                    contents.append(
                        types.Content(
                            role="user",
                            parts=[
                                types.Part.from_function_response(
                                    name=function_call["name"],
                                    response={"result": tool_result},
                                )
                            ],
                        )
                    )
            else:
                return result.text
            # Leave this in place. You will un-comment it during a durability
            # test later on.
            # await asyncio.sleep(10)

    async def _handle_function_call(self, function_call: dict) -> str:
        """Execute a tool via dynamic activity and return the result."""
        tool_name = function_call["name"]
        tool_args = function_call.get("args", {})

        result = await workflow.execute_activity(
            tool_name,
            tool_args,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return result

Der Agentenkreislauf ist vollständig robust. Wenn der Agent-Worker nach mehreren Schleifendurchläufen abstürzt, setzt Temporal genau dort fort, wo er aufgehört hat. Bereits ausgeführte LLM-Aufrufe oder Tool-Aufrufe müssen nicht noch einmal aufgerufen werden.

Worker-Start

Verkabeln Sie schließlich alles. Der Code implementiert die erforderliche Geschäftslogik so, dass es aussieht, als würde er in einem einzigen Prozess ausgeführt. Durch die Verwendung von Temporal wird er jedoch zu einem ereignisgesteuerten System (insbesondere einem Event-Sourcing-System), in dem die Kommunikation zwischen dem Workflow und den Aktivitäten über Messaging erfolgt, das von Temporal bereitgestellt wird.

Der Temporal-Worker stellt eine Verbindung zum Temporal-Dienst her und fungiert als Scheduler für die Workflow- und Aktivitätsaufgaben. Der Worker registriert den Workflow und beide Aktivitäten und beginnt dann, auf Aufgaben zu warten.

import asyncio
from concurrent.futures import ThreadPoolExecutor

from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

async def main():
    config = ClientConfig.load_client_connect_config()
    config.setdefault("target_host", "localhost:7233")
    client = await Client.connect(
        **config,
        data_converter=pydantic_data_converter,
    )

    worker = Worker(
        client,
        task_queue="gemini-agent-python-task-queue",
        workflows=[
            AgentWorkflow,
        ],
        activities=[
            generate_content,
            dynamic_tool_activity,
        ],
        activity_executor=ThreadPoolExecutor(max_workers=10),
    )
    await worker.run()


if __name__ == "__main__":
    load_dotenv()
    asyncio.run(main())

Das Client-Script

Erstellen Sie das Client-Skript (start_workflow.py). Es sendet eine Anfrage und wartet auf das Ergebnis. Sie sehen, dass sie mit derselben Aufgabenwarteschlange verbunden ist, auf die im Agent-Worker verwiesen wird. Das start_workflow-Skript sendet eine Workflow-Aufgabe mit dem Nutzer-Prompt an diese Aufgabenwarteschlange und startet so die Ausführung des Agents.

import asyncio
import sys
import uuid

from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter


async def main():
    client = await Client.connect(
        "localhost:7233",
        data_converter=pydantic_data_converter,
    )

    query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"

    result = await client.execute_workflow(
        "AgentWorkflow",
        query,
        id=f"gemini-agent-id-{uuid.uuid4()}",
        task_queue="gemini-agent-python-task-queue",
    )
    print(f"\nResult:\n{result}")


if __name__ == "__main__":
    asyncio.run(main())

Agent ausführen

Wenn Sie es noch nicht getan haben, starten Sie den Temporal-Entwicklungsserver:

temporal server start-dev

Starten Sie in einem neuen Terminalfenster den Agent-Worker:

python -m durable_agent_worker

Senden Sie in einem dritten Terminalfenster eine Anfrage an Ihren Agenten:

python -m start_workflow "are there any weather alerts for where I am?"

Sehen Sie sich die Ausgabe im Terminal von durable_agent_worker an. Dort werden die Aktionen angezeigt, die in jeder Iteration des Agent-Loops ausgeführt werden. Das LLM kann die Nutzeranfrage erfüllen, indem es eine Reihe von Tools aufruft, die ihm zur Verfügung stehen. Die ausgeführten Schritte können Sie in der Temporal-UI unter http://localhost:8233/namespaces/default/workflows einsehen.

Probieren Sie einige verschiedene Prompts aus, um den Grund des Kundenservicemitarbeiters und die Anruftools zu sehen:

python -m start_workflow "are there any weather alerts for New York?"
python -m start_workflow "where am I?"
python -m start_workflow "what is my ip address?"
python -m start_workflow "tell me a joke"

Für den letzten Prompt sind keine Tools erforderlich. Der Agent antwortet daher mit einem Haiku, das auf SYSTEM_INSTRUCTIONS basiert.

Haltbarkeit testen (optional)

Wenn Sie auf Temporal aufbauen, übersteht Ihr Agent Fehler nahtlos. Sie können dies mit zwei separaten Tests testen.

Netzwerkausfall simulieren

Bei diesem Test deaktivieren Sie vorübergehend die Internetverbindung Ihres Computers, senden einen Workflow, beobachten, wie Temporal es automatisch noch einmal versucht, und stellen dann das Netzwerk wieder her, um zu sehen, wie es sich erholt.

  1. Trennen Sie Ihren Computer vom Internet (z. B. indem Sie das WLAN deaktivieren).
  2. Workflow einreichen:

    python -m start_workflow "tell me a joke"
  3. Sehen Sie sich die Temporal-Benutzeroberfläche (http://localhost:8233) an. Sie sehen, dass die LLM-Aktivität fehlschlägt und Temporal die Wiederholungsversuche automatisch im Hintergrund verwaltet.

  4. Stellen Sie eine Verbindung zum Internet her.

  5. Beim nächsten automatischen Wiederholungsversuch wird die Gemini API erfolgreich erreicht und das Endergebnis wird in Ihrem Terminal ausgegeben.

Worker-Absturz überleben

Bei diesem Test beenden Sie den Worker während der Ausführung und starten ihn neu. Bei temporären Wiederholungen wird der Workflowverlauf (Event Sourcing) wiedergegeben und die Ausführung wird bei der letzten abgeschlossenen Aktivität fortgesetzt. Bereits abgeschlossene LLM-Aufrufe und Tool-Aufrufe werden nicht wiederholt.

  1. Wenn Sie sich Zeit geben möchten, den Worker zu beenden, öffnen Sie durable_agent_worker.py und heben Sie vorübergehend die Auskommentierung von await asyncio.sleep(10) in der AgentWorkflow-run-Schleife auf.
  2. Worker neu starten:

    python -m durable_agent_worker
  3. Eine Anfrage senden, die mehrere Tools auslöst:

    python -m start_workflow "are there any weather alerts where I am?"
  4. Beenden Sie den Worker-Prozess jederzeit vor Abschluss (Ctrl-C im Worker-Terminal oder mit kill %1, wenn er im Hintergrund ausgeführt wird).

  5. Worker neu starten:

    python -m durable_agent_worker

Temporal spielt den Workflow-Verlauf noch einmal ab. LLM-Aufrufe und Tool-Aufrufe, die bereits abgeschlossen sind, werden nicht noch einmal ausgeführt. Ihre Ergebnisse werden sofort aus dem Verlauf (dem Ereignisprotokoll) wiedergegeben. Der Workflow wird erfolgreich abgeschlossen.

Weitere Ressourcen