عامل هوش مصنوعی بادوام با Gemini و Temporal

این آموزش شما را در ساخت یک حلقه عامل به سبک ReAct راهنمایی می‌کند که از Gemini API برای استدلال و از Temporal برای دوام استفاده می‌کند. کد منبع کامل این آموزش در GitHub موجود است.

عامل می‌تواند ابزارهایی مانند جستجوی هشدارهای آب و هوا یا تعیین موقعیت مکانی یک آدرس IP را فراخوانی کند و تا زمانی که اطلاعات کافی برای پاسخ دادن نداشته باشد، به صورت حلقه‌ای عمل کند.

چیزی که این را از یک دموی معمولی عامل متمایز می‌کند، پایداری آن است. هر فراخوانی LLM، هر فراخوانی ابزار و هر مرحله از حلقه عامل توسط Temporal حفظ می‌شود. اگر فرآیند از کار بیفتد، شبکه از کار بیفتد یا یک API دچار وقفه شود، Temporal به طور خودکار دوباره تلاش می‌کند و از آخرین مرحله تکمیل شده ادامه می‌دهد. هیچ سابقه مکالمه‌ای از بین نمی‌رود و هیچ فراخوانی ابزاری به اشتباه تکرار نمی‌شود.

معماری

معماری از سه بخش تشکیل شده است:

  • گردش کار: حلقه عاملی که منطق اجرا را هماهنگ می‌کند.
  • فعالیت‌ها: واحدهای کاری مجزا (فراخوان‌های LLM، فراخوان‌های ابزار) که Temporal آنها را پایدار می‌کند.
  • کارگر: فرآیندی که گردش‌های کاری و فعالیت‌ها را اجرا می‌کند.

در این مثال، هر سه این قطعات را در یک فایل واحد ( durable_agent_worker.py ) قرار خواهید داد. در پیاده‌سازی واقعی، آنها را از هم جدا می‌کنید تا مزایای مختلف استقرار و مقیاس‌پذیری را فراهم کنید. کدی را که یک اعلان به عامل ارائه می‌دهد، در فایل دوم ( start_workflow.py ) قرار خواهید داد.

پیش‌نیازها

برای تکمیل این راهنما، به موارد زیر نیاز دارید:

راه‌اندازی

قبل از شروع، مطمئن شوید که یک سرور توسعه Temporal به صورت محلی در حال اجرا دارید:

temporal server start-dev

در مرحله بعد، وابستگی‌های مورد نیاز را نصب کنید:

pip install temporalio google-genai httpx pydantic python-dotenv

یک فایل .env در دایرکتوری پروژه خود با کلید API Gemini خود ایجاد کنید. می‌توانید کلید API را از Google AI Studio دریافت کنید.

echo "GOOGLE_API_KEY=your-api-key-here" > .env

پیاده‌سازی

بقیه این آموزش، فایل durable_agent_worker.py را از بالا به پایین بررسی می‌کند و عامل را قطعه قطعه می‌سازد. فایل را ایجاد کنید و مراحل را دنبال کنید.

واردات و راه‌اندازی جعبه شنی

با واردات (imports) که باید از قبل تعریف شوند شروع کنید. بلوک workflow.unsafe.imports_passed_through() به محیط کاری Temporal می‌گوید که به ماژول‌های خاصی اجازه عبور بدون محدودیت را بدهد. این امر ضروری است زیرا چندین کتابخانه (به ویژه httpx که زیرکلاس urllib.request.Request است) از الگوهایی استفاده می‌کنند که در غیر این صورت محیط کاری آنها را مسدود می‌کرد.

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    import pydantic_core  # noqa: F401
    import annotated_types  # noqa: F401

    import httpx
    from pydantic import BaseModel, Field
    from google import genai
    from google.genai import types

دستورالعمل‌های سیستم

در مرحله بعد، شخصیت عامل را تعریف کنید. دستورالعمل‌های سیستم به مدل می‌گویند که چگونه رفتار کند. به این عامل دستور داده شده است که در صورت عدم نیاز به ابزار، با هایکو (شعرهای کوتاه ژاپنی) پاسخ دهد.

SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""

تعاریف ابزار

حالا ابزارهایی را که عامل می‌تواند استفاده کند تعریف کنید. هر ابزار یک تابع ناهمگام با یک رشته سند توصیفی است. ابزارهایی که پارامترها را دریافت می‌کنند، از یک مدل Pydantic به عنوان آرگومان واحد خود استفاده می‌کنند. این یک روش بهینه زمانی است که امضاهای فعالیت را با اضافه کردن فیلدهای اختیاری در طول زمان، پایدار نگه می‌دارد.

import json

NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"


class GetWeatherAlertsRequest(BaseModel):
    """Request model for getting weather alerts."""

    state: str = Field(description="Two-letter US state code (e.g. CA, NY)")


async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
    """Get weather alerts for a US state.

    Args:
        request: The request object containing:
            - state: Two-letter US state code (e.g. CA, NY)
    """
    headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
    url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"

    async with httpx.AsyncClient() as client:
        response = await client.get(url, headers=headers, timeout=5.0)
        response.raise_for_status()
        return json.dumps(response.json())

در مرحله بعد، ابزارهایی برای موقعیت جغرافیایی آدرس IP تعریف کنید:

class GetLocationRequest(BaseModel):
    """Request model for getting location info from an IP address."""

    ipaddress: str = Field(description="An IP address")


async def get_ip_address() -> str:
    """Get the public IP address of the current machine."""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://icanhazip.com")
        response.raise_for_status()
        return response.text.strip()


async def get_location_info(request: GetLocationRequest) -> str:
    """Get the location information for an IP address including city, state, and country.

    Args:
        request: The request object containing:
            - ipaddress: An IP address to look up
    """
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
        response.raise_for_status()
        result = response.json()
        return f"{result['city']}, {result['regionName']}, {result['country']}"

رجیستری ابزار

در مرحله بعد، یک رجیستری ایجاد کنید که نام ابزارها را به توابع هندلر نگاشت کند. تابع get_tools() با استفاده FunctionDeclaration.from_callable_with_api_option() اشیاء FunctionDeclaration سازگار با Gemini را از فراخوانی‌ها تولید می‌کند.

from typing import Any, Awaitable, Callable

ToolHandler = Callable[..., Awaitable[Any]]


def get_handler(tool_name: str) -> ToolHandler:
    """Get the handler function for a given tool name."""
    if tool_name == "get_location_info":
        return get_location_info
    if tool_name == "get_ip_address":
        return get_ip_address
    if tool_name == "get_weather_alerts":
        return get_weather_alerts
    raise ValueError(f"Unknown tool name: {tool_name}")


def get_tools() -> types.Tool:
    """Get the Tool object containing all available function declarations.

    Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
    to generate tool definitions from the handler functions.
    """
    return types.Tool(
        function_declarations=[
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_weather_alerts, api_option="GEMINI_API"
            ),
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_location_info, api_option="GEMINI_API"
            ),
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_ip_address, api_option="GEMINI_API"
            ),
        ]
    )

فعالیت LLM

حالا activity ای که رابط برنامه‌نویسی Gemini را فراخوانی می‌کند، تعریف کنید. کلاس‌های داده GeminiChatRequest و GeminiChatResponse قرارداد را تعریف می‌کنند.

شما فراخوانی خودکار تابع را غیرفعال خواهید کرد تا فراخوانی LLM و فراخوانی ابزار به عنوان وظایف جداگانه مدیریت شوند و دوام بیشتری برای عامل شما به ارمغان بیاورند. همچنین تلاش‌های مجدد داخلی SDK ( attempts=1 ) را غیرفعال خواهید کرد زیرا Temporal تلاش‌های مجدد را به طور مداوم مدیریت می‌کند.

import os
from dataclasses import dataclass

from temporalio import activity

@dataclass
class GeminiChatRequest:
    """Request parameters for a Gemini chat completion."""

    model: str
    system_instruction: str
    contents: list[types.Content]
    tools: list[types.Tool]


@dataclass
class GeminiChatResponse:
    """Response from a Gemini chat completion."""

    text: str | None
    function_calls: list[dict[str, Any]]
    raw_parts: list[types.Part]


@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
    """Execute a Gemini chat completion with tool support."""
    api_key = os.environ.get("GOOGLE_API_KEY")
    if not api_key:
        raise ValueError("GOOGLE_API_KEY environment variable is not set")
    client = genai.Client(
        api_key=api_key,
        http_options=types.HttpOptions(
            retry_options=types.HttpRetryOptions(attempts=1),
        ),
    )

    config = types.GenerateContentConfig(
        system_instruction=request.system_instruction,
        tools=request.tools,
        automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
    )

    response = await client.aio.models.generate_content(
        model=request.model,
        contents=request.contents,
        config=config,
    )

    function_calls = []
    raw_parts = []
    text_parts = []

    if response.candidates and response.candidates[0].content:
        for part in response.candidates[0].content.parts:
            raw_parts.append(part)
            if part.function_call:
                function_calls.append(
                    {
                        "name": part.function_call.name,
                        "args": dict(part.function_call.args) if part.function_call.args else {},
                    }
                )
            elif part.text:
                text_parts.append(part.text)

    text = "".join(text_parts) if text_parts and not function_calls else None

    return GeminiChatResponse(
        text=text,
        function_calls=function_calls,
        raw_parts=raw_parts,
    )

فعالیت پویای ابزار

در مرحله بعد، فعالیتی را که ابزارها را اجرا می‌کند تعریف کنید. این از ویژگی فعالیت پویای Temporal استفاده می‌کند: کنترل‌کننده ابزار (یک تابع قابل فراخوانی) از رجیستری ابزار از طریق تابع get_handler به دست می‌آید. این امر امکان تعریف عامل‌های مختلف را صرفاً با ارائه مجموعه‌ای متفاوت از ابزارها و دستورالعمل‌های سیستم فراهم می‌کند؛ گردش کاری که حلقه عامل را پیاده‌سازی می‌کند نیازی به تغییر ندارد.

این فعالیت امضای کنترل‌کننده را بررسی می‌کند تا نحوه‌ی ارسال آرگومان‌ها را تعیین کند. اگر کنترل‌کننده انتظار یک مدل Pydantic را داشته باشد، فرمت خروجی تو در تو که Gemini تولید می‌کند را مدیریت می‌کند (برای مثال، {"request": {"state": "CA"}} به جای {"state": "CA"} ).

import inspect
from collections.abc import Sequence

from temporalio.common import RawValue

@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
    """Execute a tool dynamically based on the activity name."""
    tool_name = activity.info().activity_type
    tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
    activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")

    handler = get_handler(tool_name)

    if not inspect.iscoroutinefunction(handler):
        raise TypeError("Tool handler must be async (awaitable).")

    sig = inspect.signature(handler)
    params = list(sig.parameters.values())

    if len(params) == 0:
        result = await handler()
    else:
        param = params[0]
        param_name = param.name
        ann = param.annotation

        if isinstance(ann, type) and issubclass(ann, BaseModel):
            nested_args = tool_args.get(param_name, tool_args)
            result = await handler(ann(**nested_args))
        else:
            result = await handler(**tool_args)

    activity.logger.info(f"Tool '{tool_name}' result: {result}")
    return result

گردش کار حلقه عامل

حالا شما تمام قطعات لازم برای ساخت عامل را دارید. کلاس AgentWorkflow یک گردش کار شامل حلقه عامل را پیاده‌سازی می‌کند. در داخل آن حلقه، LLM از طریق activity فراخوانی می‌شود (که آن را بادوام می‌کند)، خروجی بررسی می‌شود و اگر ابزاری توسط LLM انتخاب شده باشد، از طریق dynamic_tool_activity فراخوانی می‌شود.

در این عامل ساده به سبک ReAct، به محض اینکه LLM تصمیم به عدم استفاده از ابزاری بگیرد، حلقه کامل در نظر گرفته می‌شود و نتیجه نهایی LLM بازگردانده می‌شود.

from datetime import timedelta

@workflow.defn
class AgentWorkflow:
    """Agentic loop workflow that uses Gemini for LLM calls and executes tools."""

    @workflow.run
    async def run(self, input: str) -> str:
        contents: list[types.Content] = [
            types.Content(role="user", parts=[types.Part.from_text(text=input)])
        ]

        tools = [get_tools()]

        while True:
            result = await workflow.execute_activity(
                generate_content,
                GeminiChatRequest(
                    model="gemini-3-flash-preview",
                    system_instruction=SYSTEM_INSTRUCTIONS,
                    contents=contents,
                    tools=tools,
                ),
                start_to_close_timeout=timedelta(seconds=60),
            )

            if result.function_calls:
                # Sending the complete raw_parts here ensures Gemini 3 thought
                # signatures are propagated correctly.
                contents.append(types.Content(role="model", parts=result.raw_parts))

                for function_call in result.function_calls:
                    tool_result = await self._handle_function_call(function_call)

                    contents.append(
                        types.Content(
                            role="user",
                            parts=[
                                types.Part.from_function_response(
                                    name=function_call["name"],
                                    response={"result": tool_result},
                                )
                            ],
                        )
                    )
            else:
                return result.text
            # Leave this in place. You will un-comment it during a durability
            # test later on.
            # await asyncio.sleep(10)

    async def _handle_function_call(self, function_call: dict) -> str:
        """Execute a tool via dynamic activity and return the result."""
        tool_name = function_call["name"]
        tool_args = function_call.get("args", {})

        result = await workflow.execute_activity(
            tool_name,
            tool_args,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return result

حلقه عامل کاملاً بادوام است. اگر عامل کارگر پس از چندین تکرار در طول حلقه از کار بیفتد، Temporal دقیقاً از همان جایی که متوقف شده بود، بدون نیاز به فراخوانی مجدد فراخوانی‌های LLM یا فراخوانی‌های ابزار که قبلاً اجرا شده‌اند، ادامه می‌دهد.

استارتاپ کارگری

در نهایت، همه چیز را به هم متصل کنید. در حالی که کد، منطق تجاری لازم را به گونه‌ای پیاده‌سازی می‌کند که به نظر می‌رسد در یک فرآیند واحد اجرا می‌شود، استفاده از Temporal آن را به یک سیستم رویداد محور (به طور خاص، منبع رویداد) تبدیل می‌کند که در آن ارتباط بین گردش کار و فعالیت‌ها از طریق پیام‌رسانی ارائه شده توسط Temporal انجام می‌شود.

کارگر Temporal به سرویس Temporal متصل می‌شود و به عنوان زمان‌بندی‌کننده برای وظایف گردش کار و فعالیت عمل می‌کند. کارگر، گردش کار و هر دو فعالیت را ثبت می‌کند، سپس شروع به گوش دادن به وظایف می‌کند.

import asyncio
from concurrent.futures import ThreadPoolExecutor

from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

async def main():
    config = ClientConfig.load_client_connect_config()
    config.setdefault("target_host", "localhost:7233")
    client = await Client.connect(
        **config,
        data_converter=pydantic_data_converter,
    )

    worker = Worker(
        client,
        task_queue="gemini-agent-python-task-queue",
        workflows=[
            AgentWorkflow,
        ],
        activities=[
            generate_content,
            dynamic_tool_activity,
        ],
        activity_executor=ThreadPoolExecutor(max_workers=10),
    )
    await worker.run()


if __name__ == "__main__":
    load_dotenv()
    asyncio.run(main())

اسکریپت کلاینت

اسکریپت کلاینت ( start_workflow.py ) را ایجاد کنید. این اسکریپت یک پرس‌وجو ارسال می‌کند و منتظر نتیجه می‌ماند. توجه داشته باشید که به همان صف وظایف ارجاع شده در agent worker متصل می‌شود - اسکریپت start_workflow یک وظیفه گردش کار را با اعلان کاربر به آن صف وظایف ارسال می‌کند و اجرای agent را آغاز می‌کند.

import asyncio
import sys
import uuid

from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter


async def main():
    client = await Client.connect(
        "localhost:7233",
        data_converter=pydantic_data_converter,
    )

    query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"

    result = await client.execute_workflow(
        "AgentWorkflow",
        query,
        id=f"gemini-agent-id-{uuid.uuid4()}",
        task_queue="gemini-agent-python-task-queue",
    )
    print(f"\nResult:\n{result}")


if __name__ == "__main__":
    asyncio.run(main())

عامل را اجرا کنید

اگر هنوز سرور توسعه Temporal را راه‌اندازی نکرده‌اید، آن را اجرا کنید:

temporal server start-dev

در یک پنجره ترمینال جدید، کارگر عامل را شروع کنید:

python -m durable_agent_worker

در پنجره ترمینال سوم، یک پرس و جو به نماینده خود ارسال کنید:

python -m start_workflow "are there any weather alerts for where I am?"

به خروجی ترمینال durable_agent_worker توجه کنید که اقداماتی را که در هر تکرار حلقه agentic اتفاق می‌افتد نشان می‌دهد. LLM قادر است با فراخوانی مجموعه‌ای از ابزارهای موجود، درخواست کاربر را برآورده کند. می‌توانید مراحلی را که از طریق رابط کاربری Temporal اجرا شده‌اند، در http://localhost:8233/namespaces/default/workflows مشاهده کنید.

برای دیدن دلیل اپراتور و ابزارهای تماس، چند دستور مختلف را امتحان کنید:

python -m start_workflow "are there any weather alerts for New York?"
python -m start_workflow "where am I?"
python -m start_workflow "what is my ip address?"
python -m start_workflow "tell me a joke"

آخرین درخواست به هیچ ابزاری نیاز ندارد، بنابراین عامل بر اساس SYSTEM_INSTRUCTIONS با یک هایکو پاسخ می‌دهد.

دوام آزمایش (اختیاری)

تکیه بر Temporal تضمین می‌کند که عامل شما به طور یکپارچه از شکست‌ها جان سالم به در می‌برد. می‌توانید این را با استفاده از دو آزمایش مجزا آزمایش کنید.

شبیه‌سازی قطعی شبکه

در این آزمایش، شما اتصال اینترنت رایانه خود را به طور موقت غیرفعال می‌کنید، یک گردش کار ارسال می‌کنید، تلاش مجدد خودکار Temporal را تماشا می‌کنید و سپس شبکه را بازیابی می‌کنید تا شاهد بازیابی آن باشید.

  1. اتصال دستگاه خود را به اینترنت قطع کنید (برای مثال، وای‌فای خود را خاموش کنید).
  2. ارسال گردش کار:

    python -m start_workflow "tell me a joke"
  3. رابط کاربری Temporal ( http://localhost:8233 ) را بررسی کنید. خواهید دید که فعالیت LLM با شکست مواجه می‌شود و Temporal به طور خودکار تلاش‌های مجدد را در پس‌زمینه مدیریت می‌کند.

  4. دوباره به اینترنت وصل شوید.

  5. تلاش مجدد خودکار بعدی با موفقیت به API Gemini خواهد رسید و ترمینال شما نتیجه نهایی را چاپ خواهد کرد.

زنده ماندن در تصادف کارگری

در این تست، شما worker را در اواسط اجرا از بین می‌برید و آن را مجدداً راه‌اندازی می‌کنید. Temporal تاریخچه گردش کار (منبع‌یابی رویداد) را دوباره اجرا می‌کند و از آخرین فعالیت تکمیل‌شده ادامه می‌دهد - فراخوانی‌های LLM و فراخوانی‌های ابزار که قبلاً تکمیل شده‌اند، تکرار نمی‌شوند.

  1. برای اینکه به خودتان زمان بدهید تا worker را از بین ببرید، durable_agent_worker.py را باز کنید و موقتاً await asyncio.sleep(10) درون حلقه run AgentWorkflow از حالت کامنت خارج کنید.
  2. کارگر را مجدداً راه اندازی کنید:

    python -m durable_agent_worker
  3. ارسال یک پرس و جو که چندین ابزار را فعال می‌کند:

    python -m start_workflow "are there any weather alerts where I am?"
  4. هر زمان که فرآیند کارگر قبل از تکمیل شدن متوقف شود ( Ctrl-C در ترمینال کارگر، یا با استفاده از kill %1 اگر در پس‌زمینه در حال اجرا است).

  5. کارگر را مجدداً راه اندازی کنید:

    python -m durable_agent_worker

Temporal تاریخچه گردش کار را دوباره اجرا می‌کند. فراخوانی‌های LLM و فراخوانی‌های ابزار که قبلاً تکمیل شده‌اند، دوباره اجرا نمی‌شوند - نتایج آنها فوراً از تاریخچه (گزارش رویداد) دوباره اجرا می‌شود. گردش کار با موفقیت به پایان می‌رسد.

منابع بیشتر