บทแนะนำนี้จะแนะนำวิธีสร้างลูปเอเจนต์สไตล์ ReAct ที่ใช้ Gemini API สำหรับการให้เหตุผลและ Temporal สำหรับความทนทาน ซอร์สโค้ดทั้งหมดของบทแนะนำนี้พร้อมให้บริการบน GitHub
เอเจนต์สามารถเรียกใช้เครื่องมือต่างๆ เช่น การค้นหาการแจ้งเตือนสภาพอากาศหรือการระบุตำแหน่งทางภูมิศาสตร์ของที่อยู่ IP และจะวนซ้ำจนกว่าจะมีข้อมูลเพียงพอที่จะตอบ
สิ่งที่ทำให้การสาธิตนี้แตกต่างจากการสาธิตเอเจนต์ทั่วไปคือความทนทาน Temporal จะบันทึกการเรียก LLM ทุกครั้ง การเรียกใช้เครื่องมือทุกครั้ง และทุกขั้นตอนของลูปเอเจนต์ หากกระบวนการขัดข้อง เครือข่ายขาดการเชื่อมต่อ หรือ API หมดเวลา Temporal จะลองอีกครั้งโดยอัตโนมัติและดำเนินการต่อจากขั้นตอนสุดท้ายที่เสร็จสมบูรณ์ ไม่มี ประวัติการสนทนาสูญหาย และไม่มีการเรียกใช้เครื่องมือซ้ำอย่างไม่ถูกต้อง
สถาปัตยกรรม
สถาปัตยกรรมประกอบด้วย 3 ส่วน ได้แก่
- เวิร์กโฟลว์: ลูปของเอเจนต์ที่จัดระเบียบตรรกะการดำเนินการ
- กิจกรรม: หน่วยงานแต่ละหน่วย (การเรียก LLM, การเรียกเครื่องมือ) ที่ Temporal ทำให้คงทน
- Worker: กระบวนการที่เรียกใช้เวิร์กโฟลว์และกิจกรรม
ในตัวอย่างนี้ คุณจะวางทั้ง 3 ส่วนนี้ไว้ในไฟล์เดียว
(durable_agent_worker.py) ในการใช้งานจริง คุณจะต้องแยก
ส่วนเหล่านี้เพื่อให้ได้ประโยชน์ด้านการติดตั้งใช้งานและความสามารถในการปรับขนาดต่างๆ คุณจะวางโค้ดที่ให้พรอมต์แก่เอเจนต์ในไฟล์ที่ 2
(start_workflow.py)
ข้อกำหนดเบื้องต้น
คุณต้องมีสิ่งต่อไปนี้จึงจะทำตามคู่มือนี้ได้
- คีย์ Gemini API คุณสร้างได้ฟรีใน Google AI Studio
- Python เวอร์ชัน 3.10 ขึ้นไป
- Temporal CLI สำหรับการเรียกใช้เซิร์ฟเวอร์ การพัฒนาภายใน
ตั้งค่า
ก่อนเริ่มต้น โปรดตรวจสอบว่าคุณมี เซิร์ฟเวอร์การพัฒนา Temporal ที่ทำงานในเครื่อง
temporal server start-devจากนั้นติดตั้งทรัพยากร Dependency ที่จำเป็น
pip install temporalio google-genai httpx pydantic python-dotenvสร้างไฟล์ .env ในไดเรกทอรีโปรเจ็กต์ด้วยคีย์ Gemini API คุณรับคีย์ API ได้จาก Google AI Studio
echo "GOOGLE_API_KEY=your-api-key-here" > .envการใช้งาน
ส่วนที่เหลือของบทแนะนำนี้จะอธิบายdurable_agent_worker.pyตั้งแต่ต้นจนจบ โดยสร้างเอเจนต์ทีละส่วน สร้างไฟล์แล้วทำตาม
การนำเข้าและการตั้งค่าแซนด์บ็อกซ์
เริ่มต้นด้วยการนำเข้าที่ต้องกำหนดล่วงหน้า บล็อก
workflow.unsafe.imports_passed_through()จะบอกแซนด์บ็อกซ์เวิร์กโฟลว์ของ Temporal
ให้ยอมรับโมดูลบางอย่างโดยไม่มีข้อจำกัด ซึ่งเป็นสิ่งจำเป็นเนื่องจากไลบรารีหลายรายการ (โดยเฉพาะ httpx ซึ่งเป็นคลาสย่อยของ urllib.request.Request) ใช้รูปแบบที่แซนด์บ็อกซ์จะบล็อก
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
คำสั่งของระบบ
จากนั้นกำหนดบุคลิกของ Agent คำสั่งของระบบจะบอกโมเดลว่าควร มีพฤติกรรมอย่างไร ตัวแทนนี้ได้รับคำสั่งให้ตอบกลับเป็นไฮกุเมื่อไม่จำเป็นต้องใช้เครื่องมือ
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
คำจำกัดความของเครื่องมือ
ตอนนี้ให้กำหนดเครื่องมือที่เอเจนต์ใช้ได้ เครื่องมือแต่ละอย่างเป็นฟังก์ชันแบบอะซิงโครนัสที่มีสตริงเอกสารอธิบาย เครื่องมือที่ใช้พารามิเตอร์จะใช้โมเดล Pydantic เป็นอาร์กิวเมนต์เดียว นี่คือแนวทางปฏิบัติแนะนำของ Temporal ที่ช่วยให้ลายเซ็นกิจกรรม มีความเสถียรเมื่อคุณเพิ่มฟิลด์ที่ไม่บังคับเมื่อเวลาผ่านไป
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
จากนั้นกำหนดเครื่องมือสำหรับตำแหน่งทางภูมิศาสตร์ของที่อยู่ IP ดังนี้
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
รีจิสทรีเครื่องมือ
จากนั้นสร้างรีจิสทรีที่แมปชื่อเครื่องมือกับฟังก์ชันตัวแฮนเดิล ฟังก์ชัน
get_tools()จะสร้างออบเจ็กต์ FunctionDeclaration ที่เข้ากันได้กับ Gemini
จากฟังก์ชันที่เรียกใช้ได้โดยใช้ FunctionDeclaration.from_callable_with_api_option()
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
กิจกรรม LLM
ตอนนี้ให้กำหนดกิจกรรมที่เรียกใช้ Gemini API คลาสข้อมูล GeminiChatRequest และ
GeminiChatResponse จะกำหนดสัญญา
คุณจะปิดใช้การเรียกใช้ฟังก์ชันอัตโนมัติเพื่อให้การเรียกใช้ LLM และการเรียกใช้เครื่องมือ
ได้รับการจัดการเป็นงานแยกกัน ซึ่งจะช่วยเพิ่มความทนทานให้กับเอเจนต์
นอกจากนี้ คุณยังจะปิดใช้การลองใหม่ในตัวของ SDK (attempts=1) เนื่องจาก
Temporal จัดการการลองใหม่ได้อย่างทนทาน
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
กิจกรรมเครื่องมือแบบไดนามิก
จากนั้นกําหนดกิจกรรมที่เรียกใช้เครื่องมือ ซึ่งใช้ฟีเจอร์กิจกรรมแบบไดนามิกของ Temporal โดยจะรับแฮนเดิลเครื่องมือ (ฟังก์ชันที่เรียกใช้ได้) จากรีจิสทรีเครื่องมือผ่านฟังก์ชัน get_handler ซึ่งช่วยให้กำหนด Agent ต่างๆ ได้ง่ายๆ เพียงแค่ระบุชุดเครื่องมือและคำสั่งของระบบที่แตกต่างกัน โดยเวิร์กโฟลว์ที่ใช้ลูปของ Agent ไม่จำเป็นต้องมีการเปลี่ยนแปลง
กิจกรรมจะตรวจสอบลายเซ็นของแฮนเดิลเพื่อกำหนดวิธีส่งอาร์กิวเมนต์ หากแฮนเดิลเลอร์คาดหวังโมเดล Pydantic ก็จะจัดการรูปแบบเอาต์พุตที่ซ้อนกัน
ซึ่ง Gemini สร้างขึ้น (เช่น {"request": {"state": "CA"}} แทน
{"state": "CA"} แบบเรียบ)
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
เวิร์กโฟลว์ลูปการทำงานเป็นตัวแทน
ตอนนี้คุณก็มีชิ้นส่วนทั้งหมดที่จะสร้างเอเจนต์ให้เสร็จแล้ว AgentWorkflow
คลาสจะใช้เวิร์กโฟลว์ที่มีลูปของ Agent ในลูปนั้น ระบบจะเรียกใช้ LLM
ผ่านกิจกรรม (ทําให้ทนทาน) ตรวจสอบเอาต์พุต และหาก LLM เลือกเครื่องมือ ระบบจะเรียกใช้ผ่าน dynamic_tool_activity
ในเอเจนต์สไตล์ ReAct อย่างง่ายนี้ เมื่อ LLM เลือกที่จะไม่ใช้เครื่องมือ ระบบจะถือว่าลูปเสร็จสมบูรณ์และแสดงผลลัพธ์ LLM สุดท้าย
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3-flash-preview",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
ลูปเอเจนต์มีความคงทนอย่างเต็มที่ หาก Worker ของเอเจนต์ขัดข้องหลังจากวนซ้ำหลายครั้ง Temporal จะดำเนินการต่อจากจุดที่หยุดไว้โดยไม่ต้องเรียกใช้การเรียก LLM หรือการเรียกใช้เครื่องมือที่ดำเนินการไปแล้วอีกครั้ง
การเริ่มต้นใช้งาน Worker
สุดท้าย ให้ต่อสายทุกอย่างเข้าด้วยกัน แม้ว่าโค้ดจะใช้ตรรกะทางธุรกิจที่จำเป็นในลักษณะที่ทำให้ดูเหมือนว่าทำงานในกระบวนการเดียว แต่การใช้ Temporal ทำให้เป็นระบบที่ขับเคลื่อนด้วยเหตุการณ์ (โดยเฉพาะอย่างยิ่งที่มาของเหตุการณ์) ซึ่งการสื่อสารระหว่างเวิร์กโฟลว์และกิจกรรมจะเกิดขึ้นผ่านการรับส่งข้อความที่ Temporal จัดเตรียมให้
Worker ของ Temporal จะเชื่อมต่อกับบริการ Temporal และทำหน้าที่เป็นตัวกำหนดเวลาสำหรับ งานเวิร์กโฟลว์และกิจกรรม Worker จะลงทะเบียนเวิร์กโฟลว์และทั้ง 2 กิจกรรม จากนั้นจะเริ่มรอรับงาน
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
สคริปต์ฝั่งไคลเอ็นต์
สร้างสคริปต์ฝั่งไคลเอ็นต์ (start_workflow.py) ซึ่งจะส่งคำค้นหาและรอผลลัพธ์
โปรดสังเกตว่าสคริปต์นี้เชื่อมต่อกับคิวงานเดียวกันกับที่อ้างอิงใน Agent
Worker ซึ่งสคริปต์ start_workflow จะส่งงานเวิร์กโฟลว์พร้อมพรอมต์ของผู้ใช้
ไปยังคิวงานนั้นเพื่อเริ่มการทำงานของเอเจนต์
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
เรียกใช้ Agent
หากยังไม่ได้ดำเนินการ ให้เริ่มเซิร์ฟเวอร์การพัฒนา Temporal โดยทำดังนี้
temporal server start-devในหน้าต่างเทอร์มินัลใหม่ ให้เริ่ม Worker ของเอเจนต์โดยใช้คำสั่งต่อไปนี้
python -m durable_agent_workerในหน้าต่างเทอร์มินัลที่ 3 ให้ส่งคำค้นหาไปยังเอเจนต์ของคุณ
python -m start_workflow "are there any weather alerts for where I am?"สังเกตเอาต์พุตในเทอร์มินัลของ durable_agent_worker ซึ่งแสดงการดำเนินการที่เกิดขึ้นในแต่ละการวนซ้ำของลูปเอเจนต์ LLM สามารถ
ตอบสนองคำขอของผู้ใช้ได้โดยการเรียกใช้ชุดเครื่องมือที่พร้อมใช้งาน คุณดูขั้นตอนที่ดำเนินการผ่าน UI ของ Temporal ได้ที่
http://localhost:8233/namespaces/default/workflows
ลองใช้พรอมต์ต่างๆ เพื่อดูเหตุผลของตัวแทนและเครื่องมือการโทร
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
พรอมต์สุดท้ายไม่จำเป็นต้องใช้เครื่องมือใดๆ ดังนั้นเอเจนต์จะตอบกลับเป็นไฮกุ
โดยอิงตามSYSTEM_INSTRUCTIONS
ทดสอบความคงทน (ไม่บังคับ)
การสร้างบน Temporal ช่วยให้มั่นใจได้ว่าเอเจนต์จะทำงานต่อไปได้แม้จะเกิดข้อผิดพลาด คุณสามารถ ทดสอบได้โดยใช้การทดสอบที่แตกต่างกัน 2 รายการ
จำลองเครือข่ายขัดข้อง
ในการทดสอบนี้ คุณจะปิดใช้การเชื่อมต่ออินเทอร์เน็ตของคอมพิวเตอร์ชั่วคราว ส่งเวิร์กโฟลว์ ดูว่า Temporal ลองใหม่โดยอัตโนมัติ แล้วกู้คืน เครือข่ายเพื่อดูว่าระบบกู้คืนได้หรือไม่
- ยกเลิกการเชื่อมต่อเครื่องกับอินเทอร์เน็ต (เช่น ปิด Wi-Fi)
วิธีส่งเวิร์กโฟลว์
python -m start_workflow "tell me a joke"ตรวจสอบ UI ของ Temporal (
http://localhost:8233) คุณจะเห็นว่ากิจกรรม LLM ล้มเหลว และ Temporal จะจัดการการลองใหม่โดยอัตโนมัติใน เบื้องหลังเชื่อมต่ออินเทอร์เน็ตอีกครั้ง
การลองใหม่โดยอัตโนมัติครั้งถัดไปจะเข้าถึง Gemini API ได้สำเร็จ และเทอร์มินัลจะพิมพ์ผลลัพธ์สุดท้าย
การเอาชีวิตรอดจากข้อขัดข้องของ Worker
ในการทดสอบนี้ คุณจะหยุดการทำงานของ Worker กลางการดำเนินการและรีสตาร์ท การเล่นซ้ำชั่วคราว จะเล่นประวัติเวิร์กโฟลว์ (การจัดหาเหตุการณ์) ซ้ำและดำเนินการต่อจากกิจกรรมที่เสร็จสมบูรณ์ล่าสุด โดยจะไม่ทำซ้ำการเรียกใช้ LLM และการเรียกใช้เครื่องมือที่เสร็จสมบูรณ์แล้ว
- หากต้องการให้เวลาตัวเองในการหยุดการทำงานของ Worker ให้เปิด
durable_agent_worker.pyและ ยกเลิกการแสดงความคิดเห็นของawait asyncio.sleep(10)ภายในลูปAgentWorkflowrunชั่วคราว รีสตาร์ท Worker โดยทำดังนี้
python -m durable_agent_workerส่งคำค้นหาที่เรียกใช้เครื่องมือหลายอย่าง
python -m start_workflow "are there any weather alerts where I am?"หยุดกระบวนการของ Worker ได้ทุกเมื่อก่อนที่จะเสร็จสมบูรณ์ (
Ctrl-Cในเทอร์มินัลของ Worker หรือใช้kill %1หากทำงานในเบื้องหลัง)รีสตาร์ท Worker โดยทำดังนี้
python -m durable_agent_worker
Temporal จะเล่นประวัติเวิร์กโฟลว์ซ้ำ ระบบไม่เรียกใช้ LLM และการเรียกใช้เครื่องมือที่ดำเนินการเสร็จแล้วซ้ำ แต่จะเล่นผลลัพธ์ซ้ำทันทีจากประวัติ (บันทึกเหตุการณ์) เวิร์กโฟลว์เสร็จสมบูรณ์