این آموزش شما را در ساخت یک حلقه عامل به سبک ReAct راهنمایی میکند که از Gemini API برای استدلال و از Temporal برای دوام استفاده میکند. کد منبع کامل این آموزش در GitHub موجود است.
عامل میتواند ابزارهایی مانند جستجوی هشدارهای آب و هوا یا تعیین موقعیت مکانی یک آدرس IP را فراخوانی کند و تا زمانی که اطلاعات کافی برای پاسخ دادن نداشته باشد، به صورت حلقهای عمل کند.
چیزی که این را از یک دموی معمولی عامل متمایز میکند، پایداری آن است. هر فراخوانی LLM، هر فراخوانی ابزار و هر مرحله از حلقه عامل توسط Temporal حفظ میشود. اگر فرآیند از کار بیفتد، شبکه از کار بیفتد یا یک API دچار وقفه شود، Temporal به طور خودکار دوباره تلاش میکند و از آخرین مرحله تکمیل شده ادامه میدهد. هیچ سابقه مکالمهای از بین نمیرود و هیچ فراخوانی ابزاری به اشتباه تکرار نمیشود.
معماری
معماری از سه بخش تشکیل شده است:
- گردش کار: حلقه عاملی که منطق اجرا را هماهنگ میکند.
- فعالیتها: واحدهای کاری مجزا (فراخوانهای LLM، فراخوانهای ابزار) که Temporal آنها را پایدار میکند.
- کارگر: فرآیندی که گردشهای کاری و فعالیتها را اجرا میکند.
در این مثال، هر سه این قطعات را در یک فایل واحد ( durable_agent_worker.py ) قرار خواهید داد. در پیادهسازی واقعی، آنها را از هم جدا میکنید تا مزایای مختلف استقرار و مقیاسپذیری را فراهم کنید. کدی را که یک اعلان به عامل ارائه میدهد، در فایل دوم ( start_workflow.py ) قرار خواهید داد.
پیشنیازها
برای تکمیل این راهنما، به موارد زیر نیاز دارید:
- یک کلید API جمینی. میتوانید آن را به صورت رایگان در Google AI Studio ایجاد کنید.
- پایتون نسخه ۳.۱۰ یا بالاتر.
- رابط خط فرمان موقت (Temporal CLI) برای اجرای یک سرور توسعه محلی.
راهاندازی
قبل از شروع، مطمئن شوید که یک سرور توسعه Temporal به صورت محلی در حال اجرا دارید:
temporal server start-devدر مرحله بعد، وابستگیهای مورد نیاز را نصب کنید:
pip install temporalio google-genai httpx pydantic python-dotenv یک فایل .env در دایرکتوری پروژه خود با کلید API Gemini خود ایجاد کنید. میتوانید کلید API را از Google AI Studio دریافت کنید.
echo "GOOGLE_API_KEY=your-api-key-here" > .envپیادهسازی
بقیه این آموزش، فایل durable_agent_worker.py را از بالا به پایین بررسی میکند و عامل را قطعه قطعه میسازد. فایل را ایجاد کنید و مراحل را دنبال کنید.
واردات و راهاندازی جعبه شنی
با واردات (imports) که باید از قبل تعریف شوند شروع کنید. بلوک workflow.unsafe.imports_passed_through() به محیط کاری Temporal میگوید که به ماژولهای خاصی اجازه عبور بدون محدودیت را بدهد. این امر ضروری است زیرا چندین کتابخانه (به ویژه httpx که زیرکلاس urllib.request.Request است) از الگوهایی استفاده میکنند که در غیر این صورت محیط کاری آنها را مسدود میکرد.
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
دستورالعملهای سیستم
در مرحله بعد، شخصیت عامل را تعریف کنید. دستورالعملهای سیستم به مدل میگویند که چگونه رفتار کند. به این عامل دستور داده شده است که در صورت عدم نیاز به ابزار، با هایکو (شعرهای کوتاه ژاپنی) پاسخ دهد.
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
تعاریف ابزار
حالا ابزارهایی را که عامل میتواند استفاده کند تعریف کنید. هر ابزار یک تابع ناهمگام با یک رشته سند توصیفی است. ابزارهایی که پارامترها را دریافت میکنند، از یک مدل Pydantic به عنوان آرگومان واحد خود استفاده میکنند. این یک روش بهینه زمانی است که امضاهای فعالیت را با اضافه کردن فیلدهای اختیاری در طول زمان، پایدار نگه میدارد.
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
در مرحله بعد، ابزارهایی برای موقعیت جغرافیایی آدرس IP تعریف کنید:
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
رجیستری ابزار
در مرحله بعد، یک رجیستری ایجاد کنید که نام ابزارها را به توابع هندلر نگاشت کند. تابع get_tools() با استفاده FunctionDeclaration.from_callable_with_api_option() اشیاء FunctionDeclaration سازگار با Gemini را از فراخوانیها تولید میکند.
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
فعالیت LLM
حالا activity ای که رابط برنامهنویسی Gemini را فراخوانی میکند، تعریف کنید. کلاسهای داده GeminiChatRequest و GeminiChatResponse قرارداد را تعریف میکنند.
شما فراخوانی خودکار تابع را غیرفعال خواهید کرد تا فراخوانی LLM و فراخوانی ابزار به عنوان وظایف جداگانه مدیریت شوند و دوام بیشتری برای عامل شما به ارمغان بیاورند. همچنین تلاشهای مجدد داخلی SDK ( attempts=1 ) را غیرفعال خواهید کرد زیرا Temporal تلاشهای مجدد را به طور مداوم مدیریت میکند.
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
فعالیت پویای ابزار
در مرحله بعد، فعالیتی را که ابزارها را اجرا میکند تعریف کنید. این از ویژگی فعالیت پویای Temporal استفاده میکند: کنترلکننده ابزار (یک تابع قابل فراخوانی) از رجیستری ابزار از طریق تابع get_handler به دست میآید. این امر امکان تعریف عاملهای مختلف را صرفاً با ارائه مجموعهای متفاوت از ابزارها و دستورالعملهای سیستم فراهم میکند؛ گردش کاری که حلقه عامل را پیادهسازی میکند نیازی به تغییر ندارد.
این فعالیت امضای کنترلکننده را بررسی میکند تا نحوهی ارسال آرگومانها را تعیین کند. اگر کنترلکننده انتظار یک مدل Pydantic را داشته باشد، فرمت خروجی تو در تو که Gemini تولید میکند را مدیریت میکند (برای مثال، {"request": {"state": "CA"}} به جای {"state": "CA"} ).
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
گردش کار حلقه عامل
حالا شما تمام قطعات لازم برای ساخت عامل را دارید. کلاس AgentWorkflow یک گردش کار شامل حلقه عامل را پیادهسازی میکند. در داخل آن حلقه، LLM از طریق activity فراخوانی میشود (که آن را بادوام میکند)، خروجی بررسی میشود و اگر ابزاری توسط LLM انتخاب شده باشد، از طریق dynamic_tool_activity فراخوانی میشود.
در این عامل ساده به سبک ReAct، به محض اینکه LLM تصمیم به عدم استفاده از ابزاری بگیرد، حلقه کامل در نظر گرفته میشود و نتیجه نهایی LLM بازگردانده میشود.
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3-flash-preview",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
حلقه عامل کاملاً بادوام است. اگر عامل کارگر پس از چندین تکرار در طول حلقه از کار بیفتد، Temporal دقیقاً از همان جایی که متوقف شده بود، بدون نیاز به فراخوانی مجدد فراخوانیهای LLM یا فراخوانیهای ابزار که قبلاً اجرا شدهاند، ادامه میدهد.
استارتاپ کارگری
در نهایت، همه چیز را به هم متصل کنید. در حالی که کد، منطق تجاری لازم را به گونهای پیادهسازی میکند که به نظر میرسد در یک فرآیند واحد اجرا میشود، استفاده از Temporal آن را به یک سیستم رویداد محور (به طور خاص، منبع رویداد) تبدیل میکند که در آن ارتباط بین گردش کار و فعالیتها از طریق پیامرسانی ارائه شده توسط Temporal انجام میشود.
کارگر Temporal به سرویس Temporal متصل میشود و به عنوان زمانبندیکننده برای وظایف گردش کار و فعالیت عمل میکند. کارگر، گردش کار و هر دو فعالیت را ثبت میکند، سپس شروع به گوش دادن به وظایف میکند.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
اسکریپت کلاینت
اسکریپت کلاینت ( start_workflow.py ) را ایجاد کنید. این اسکریپت یک پرسوجو ارسال میکند و منتظر نتیجه میماند. توجه داشته باشید که به همان صف وظایف ارجاع شده در agent worker متصل میشود - اسکریپت start_workflow یک وظیفه گردش کار را با اعلان کاربر به آن صف وظایف ارسال میکند و اجرای agent را آغاز میکند.
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
عامل را اجرا کنید
اگر هنوز سرور توسعه Temporal را راهاندازی نکردهاید، آن را اجرا کنید:
temporal server start-devدر یک پنجره ترمینال جدید، کارگر عامل را شروع کنید:
python -m durable_agent_workerدر پنجره ترمینال سوم، یک پرس و جو به نماینده خود ارسال کنید:
python -m start_workflow "are there any weather alerts for where I am?"به خروجی ترمینال durable_agent_worker توجه کنید که اقداماتی را که در هر تکرار حلقه agentic اتفاق میافتد نشان میدهد. LLM قادر است با فراخوانی مجموعهای از ابزارهای موجود، درخواست کاربر را برآورده کند. میتوانید مراحلی را که از طریق رابط کاربری Temporal اجرا شدهاند، در http://localhost:8233/namespaces/default/workflows مشاهده کنید.
برای دیدن دلیل اپراتور و ابزارهای تماس، چند دستور مختلف را امتحان کنید:
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
آخرین درخواست به هیچ ابزاری نیاز ندارد، بنابراین عامل بر اساس SYSTEM_INSTRUCTIONS با یک هایکو پاسخ میدهد.
دوام آزمایش (اختیاری)
تکیه بر Temporal تضمین میکند که عامل شما به طور یکپارچه از شکستها جان سالم به در میبرد. میتوانید این را با استفاده از دو آزمایش مجزا آزمایش کنید.
شبیهسازی قطعی شبکه
در این آزمایش، شما اتصال اینترنت رایانه خود را به طور موقت غیرفعال میکنید، یک گردش کار ارسال میکنید، تلاش مجدد خودکار Temporal را تماشا میکنید و سپس شبکه را بازیابی میکنید تا شاهد بازیابی آن باشید.
- اتصال دستگاه خود را به اینترنت قطع کنید (برای مثال، وایفای خود را خاموش کنید).
ارسال گردش کار:
python -m start_workflow "tell me a joke"رابط کاربری Temporal (
http://localhost:8233) را بررسی کنید. خواهید دید که فعالیت LLM با شکست مواجه میشود و Temporal به طور خودکار تلاشهای مجدد را در پسزمینه مدیریت میکند.دوباره به اینترنت وصل شوید.
تلاش مجدد خودکار بعدی با موفقیت به API Gemini خواهد رسید و ترمینال شما نتیجه نهایی را چاپ خواهد کرد.
زنده ماندن در تصادف کارگری
در این تست، شما worker را در اواسط اجرا از بین میبرید و آن را مجدداً راهاندازی میکنید. Temporal تاریخچه گردش کار (منبعیابی رویداد) را دوباره اجرا میکند و از آخرین فعالیت تکمیلشده ادامه میدهد - فراخوانیهای LLM و فراخوانیهای ابزار که قبلاً تکمیل شدهاند، تکرار نمیشوند.
- برای اینکه به خودتان زمان بدهید تا worker را از بین ببرید،
durable_agent_worker.pyرا باز کنید و موقتاًawait asyncio.sleep(10)درون حلقهrunAgentWorkflowاز حالت کامنت خارج کنید. کارگر را مجدداً راه اندازی کنید:
python -m durable_agent_workerارسال یک پرس و جو که چندین ابزار را فعال میکند:
python -m start_workflow "are there any weather alerts where I am?"هر زمان که فرآیند کارگر قبل از تکمیل شدن متوقف شود (
Ctrl-Cدر ترمینال کارگر، یا با استفاده ازkill %1اگر در پسزمینه در حال اجرا است).کارگر را مجدداً راه اندازی کنید:
python -m durable_agent_worker
Temporal تاریخچه گردش کار را دوباره اجرا میکند. فراخوانیهای LLM و فراخوانیهای ابزار که قبلاً تکمیل شدهاند، دوباره اجرا نمیشوند - نتایج آنها فوراً از تاریخچه (گزارش رویداد) دوباره اجرا میشود. گردش کار با موفقیت به پایان میرسد.