このチュートリアルでは、推論に Gemini API を使用し、永続性に Temporal を使用する ReAct スタイルのエージェント ループを構築する手順について説明します。このチュートリアルの完全なソースコードは GitHub で入手できます。
エージェントは、天気アラートの検索や IP アドレスの地理位置情報の特定などのツールを呼び出すことができ、応答に必要な十分な情報が得られるまでループします。
一般的なエージェント デモと異なるのは、耐久性です。すべての LLM 呼び出し、すべてのツール呼び出し、エージェント ループのすべてのステップは、Temporal によって永続化されます。プロセスがクラッシュした場合、ネットワークが切断された場合、API がタイムアウトした場合、Temporal は自動的に再試行し、最後に完了したステップから再開します。会話履歴が失われたり、ツール呼び出しが誤って繰り返されたりすることはありません。
アーキテクチャ
このアーキテクチャは次の 3 つの部分で構成されています。
- ワークフロー: 実行ロジックをオーケストレートするエージェント ループ。
- アクティビティ: Temporal が永続化する個々の作業単位(LLM 呼び出し、ツール呼び出し)。
- ワーカー: ワークフローとアクティビティを実行するプロセス。
この例では、これら 3 つの要素をすべて 1 つのファイル(durable_agent_worker.py)に配置します。実際の環境では、さまざまなデプロイとスケーラビリティのメリットを活かすために、これらを分離します。エージェントにプロンプトを提供するコードは、2 つ目のファイル(start_workflow.py)に配置します。
前提条件
このガイドを完了するには、次のものが必要です。
- Gemini API キー。Google AI Studio で無料で作成できます。
- Python バージョン 3.10 以降。
- ローカル開発サーバーを実行するための Temporal CLI。
セットアップ
始める前に、Temporal 開発サーバーがローカルで実行されていることを確認してください。
temporal server start-dev次に、必要な依存関係をインストールします。
pip install temporalio google-genai httpx pydantic python-dotenvGemini API キーを使用して、プロジェクト ディレクトリに .env ファイルを作成します。API キーは Google AI Studio から取得できます。
echo "GOOGLE_API_KEY=your-api-key-here" > .env実装
このチュートリアルの残りの部分では、durable_agent_worker.py を上から下まで順に説明し、エージェントを少しずつ構築していきます。ファイルを作成して、手順に沿って操作します。
インポートとサンドボックスの設定
最初に、事前に定義する必要があるインポートから始めます。workflow.unsafe.imports_passed_through() ブロックは、特定のモジュールを制限なく通過させるよう Temporal のワークフロー サンドボックスに指示します。これは、いくつかのライブラリ(特に urllib.request.Request をサブクラス化する httpx)が、サンドボックスでブロックされるパターンを使用しているためです。
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
システム指示
次に、エージェントの個性を定義します。システム指示は、モデルの動作方法を指定します。このエージェントは、ツールが必要ない場合は俳句で応答するように指示されています。
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
ツール定義
次に、エージェントが使用できるツールを定義します。各ツールは、説明的な docstring を持つ非同期関数です。パラメータを受け取るツールは、単一の引数として Pydantic モデルを使用します。これは、時間の経過とともにオプション フィールドを追加してもアクティビティ シグネチャを安定させる Temporal のベスト プラクティスです。
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
次に、IP アドレスの地理位置情報ツールを定義します。
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
ツール レジストリ
次に、ツール名をハンドラ関数にマッピングするレジストリを作成します。get_tools() 関数は、FunctionDeclaration.from_callable_with_api_option() を使用して、呼び出し可能オブジェクトから Gemini 互換の FunctionDeclaration オブジェクトを生成します。
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
LLM アクティビティ
次に、Gemini API を呼び出すアクティビティを定義します。GeminiChatRequest データクラスと GeminiChatResponse データクラスはコントラクトを定義します。
自動関数呼び出しを無効にして、LLM 呼び出しとツール呼び出しを別々のタスクとして処理し、エージェントの耐久性を高めます。Temporal は再試行を永続的に処理するため、SDK の組み込み再試行(attempts=1)も無効にします。
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
動的ツールの活動
次に、ツールを実行するアクティビティを定義します。これは、Temporal の動的アクティビティ機能を使用します。ツール ハンドラ(呼び出し可能)は、get_handler 関数を介してツール レジストリから取得されます。これにより、異なるツールとシステム手順を指定するだけで、さまざまなエージェントを定義できます。エージェント ループを実装するワークフローを変更する必要はありません。
アクティビティは、ハンドラのシグネチャを調べて、引数を渡す方法を決定します。ハンドラが Pydantic モデルを想定している場合、Gemini が生成するネストされた出力形式(フラットな {"state": "CA"} ではなく {"request": {"state": "CA"}} など)を処理します。
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
エージェント ループのワークフロー
これで、エージェントの構築を完了するための準備が整いました。AgentWorkflow クラスは、エージェント ループを含むワークフローを実装します。このループ内で、LLM はアクティビティを介して呼び出され(耐久性を持たせるため)、出力が検査されます。LLM によってツールが選択された場合は、dynamic_tool_activity を介して呼び出されます。
このシンプルな ReAct スタイルのエージェントでは、LLM がツールを使用しないことを選択すると、ループが完了したとみなされ、最終的な LLM の結果が返されます。
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3-flash-preview",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
エージェント ループは完全に耐久性があります。ループを数回繰り返した後にエージェント ワーカーがクラッシュした場合、Temporal は、すでに実行された LLM 呼び出しやツール呼び出しを再呼び出しする必要なく、中断した場所から正確に再開します。
ワーカーの起動
最後に、すべてを配線します。コードは、単一のプロセスで実行されているように見える方法で必要なビジネス ロジックを実装しますが、Temporal を使用すると、ワークフローとアクティビティ間の通信が Temporal によって提供されるメッセージングを介して行われるイベント ドリブン システム(具体的にはイベント ソース)になります。
Temporal ワーカーは Temporal サービスに接続し、ワークフローとアクティビティ タスクのスケジューラとして機能します。ワーカーはワークフローと両方のアクティビティを登録し、タスクのリスニングを開始します。
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
クライアント スクリプト
クライアント スクリプト(start_workflow.py)を作成します。クエリを送信し、結果を待ちます。エージェント ワーカーで参照されているのと同じタスクキューに接続されていることに注意してください。start_workflow スクリプトは、ユーザー プロンプトを含むワークフロー タスクをそのタスクキューにディスパッチし、エージェントの実行を開始します。
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
エージェントを実行する
まだ行っていない場合は、Temporal 開発用サーバーを起動します。
temporal server start-dev新しいターミナル ウィンドウで、エージェント ワーカーを起動します。
python -m durable_agent_worker3 つ目のターミナル ウィンドウで、エージェントにクエリを送信します。
python -m start_workflow "are there any weather alerts for where I am?"durable_agent_worker のターミナルに出力される、エージェント ループの各イテレーションで発生するアクションを確認します。LLM は、利用可能な一連のツールを呼び出すことで、ユーザーのリクエストを満たすことができます。実行された手順は、Temporal UI(http://localhost:8233/namespaces/default/workflows)で確認できます。
いくつかのプロンプトを試して、エージェントが推論してツールを呼び出す様子を確認します。
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
最後のプロンプトではツールは必要ないため、エージェントは SYSTEM_INSTRUCTIONS に基づいて俳句で応答します。
耐久性のテスト(省略可)
Temporal を基盤に構築することで、エージェントは障害からシームレスに復旧できます。このテストは、2 つの異なるテストを使用して行うことができます。
ネットワーク停止のシミュレーション
このテストでは、パソコンのインターネット接続を一時的に無効にして、ワークフローを送信し、Temporal が自動的に再試行するのを確認してから、ネットワークを復元して回復を確認します。
- マシンをインターネットから切断します(Wi-Fi をオフにするなど)。
ワークフローを送信します。
python -m start_workflow "tell me a joke"Temporal UI(
http://localhost:8233)を確認します。LLM アクティビティが失敗し、Temporal がバックグラウンドで再試行を自動的に管理していることがわかります。インターネットに再接続します。
次の自動再試行で Gemini API に到達し、ターミナルに最終結果が出力されます。
ワーカーのクラッシュから復旧する
このテストでは、実行中にワーカーを強制終了して再起動します。Temporal はワークフロー履歴(イベント ソーシング)を再生し、最後に完了したアクティビティから再開します。すでに完了した LLM 呼び出しとツール呼び出しは繰り返されません。
- ワーカーを強制終了する時間を確保するには、
durable_agent_worker.pyを開き、AgentWorkflowrunループ内のawait asyncio.sleep(10)を一時的にコメント解除します。 ワーカーを再起動します。
python -m durable_agent_worker複数のツールをトリガーするクエリを送信します。
python -m start_workflow "are there any weather alerts where I am?"完了前にいつでもワーカー プロセスを強制終了します(ワーカー ターミナルで
Ctrl-Cを使用するか、バックグラウンドで実行している場合はkill %1を使用します)。ワーカーを再起動します。
python -m durable_agent_worker
Temporal はワークフロー履歴を再生します。すでに完了した LLM の呼び出しとツールの呼び出しは再実行されません。結果は履歴(イベントログ)から即座に再生されます。ワークフローが正常に完了します。