وكيل الذكاء الاصطناعي الدائم مع Gemini وTemporal

يرشدك هذا البرنامج التعليمي إلى خطوات إنشاء حلقة عامل بأسلوب ReAct تستخدم واجهة برمجة التطبيقات Gemini API للاستدلال وTemporal لضمان استمراريتها. يتوفّر الرمز المصدر الكامل لهذا البرنامج التعليمي على GitHub.

يمكن للوكيل استدعاء أدوات، مثل البحث عن تنبيهات بشأن الطقس أو تحديد الموقع الجغرافي لعنوان IP، وسيواصل البحث إلى أن يحصل على معلومات كافية للرد.

ما يميّز هذا العرض التوضيحي عن العروض التوضيحية النموذجية هو المتانة. يحتفظ Temporal بكل طلب يتم إرساله إلى نموذج اللغة الكبير (LLM) وكل عملية استدعاء لأداة وكل خطوة من خطوات حلقة التنفيذ. في حال تعذُّر إكمال العملية أو انقطاع الشبكة أو انتهاء المهلة المحدّدة لواجهة برمجة التطبيقات، تعيد Temporal المحاولة تلقائيًا وتستأنف العملية من آخر خطوة تم إكمالها. لن يتم فقدان أي سجلّ محادثات، ولن يتم تكرار أي طلبات أدوات بشكل غير صحيح.

الهندسة المعمارية

تتألف البنية من ثلاثة أجزاء:

  • سير العمل: حلقة الوكيل التي تنظّم منطق التنفيذ.
  • الأنشطة: هي وحدات عمل فردية (طلبات إلى النموذج اللغوي الكبير، وطلبات إلى الأدوات) تجعلها Temporal دائمة.
  • العامل: العملية التي تنفّذ سير العمل والأنشطة.

في هذا المثال، ستضع كل هذه الأجزاء الثلاثة في ملف واحد (durable_agent_worker.py). وفي عملية التنفيذ في العالم الحقيقي، ستفصل بينها للاستفادة من مزايا مختلفة في ما يتعلق بالنشر وقابلية التوسّع. ستضع الرمز الذي يقدّم طلبًا إلى الوكيل في ملف ثانٍ (start_workflow.py).

المتطلبات الأساسية

لإكمال هذا الدليل، ستحتاج إلى:

الإعداد

قبل البدء، تأكَّد من تشغيل خادم تطوير Temporal محليًا:

temporal server start-dev

بعد ذلك، ثبِّت التبعيات المطلوبة:

pip install temporalio google-genai httpx pydantic python-dotenv

أنشئ ملف .env في دليل مشروعك باستخدام مفتاح Gemini API. يمكنك الحصول على مفتاح واجهة برمجة التطبيقات من Google AI Studio.

echo "GOOGLE_API_KEY=your-api-key-here" > .env

التنفيذ

توضّح لك بقية هذا البرنامج التعليمي كيفية استخدام durable_agent_worker.py من البداية إلى النهاية، مع إنشاء جزء الوكيل خطوة بخطوة. أنشئ الملف واتّبِع الخطوات.

عمليات الاستيراد وإعداد وضع الحماية

ابدأ بعمليات الاستيراد التي يجب تحديدها مسبقًا. يخبر الحظر workflow.unsafe.imports_passed_through() بيئة الاختبار المعزولة لسير العمل في Temporal بأنّ بعض الوحدات يمكنها المرور بدون قيود. هذا الإجراء ضروري لأنّ العديد من المكتبات (خاصةً httpx التي تنتمي إلى الفئة الفرعية urllib.request.Request) تستخدم أنماطًا سيحظرها وضع الحماية في حال عدم اتّخاذ هذا الإجراء.

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    import pydantic_core  # noqa: F401
    import annotated_types  # noqa: F401

    import httpx
    from pydantic import BaseModel, Field
    from google import genai
    from google.genai import types

تعليمات النظام

بعد ذلك، حدِّد شخصية الوكيل. تخبر تعليمات النظام النموذج بكيفية التصرف. يتم توجيه هذا الوكيل بالرد في أبيات شعرية من نوع "هايكو" عندما لا تكون هناك حاجة إلى أدوات.

SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""

تعريفات الأدوات

الآن، حدِّد الأدوات التي يمكن للوكيل استخدامها. كل أداة هي دالة غير متزامنة تتضمّن سلسلة مستندات وصفية. تستخدم الأدوات التي تتضمّن مَعلمات نموذج Pydantic كمعلَمة واحدة. هذه أفضل ممارسة في Temporal تحافظ على ثبات تواقيع الأنشطة عند إضافة حقول اختيارية بمرور الوقت.

import json

NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"


class GetWeatherAlertsRequest(BaseModel):
    """Request model for getting weather alerts."""

    state: str = Field(description="Two-letter US state code (e.g. CA, NY)")


async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
    """Get weather alerts for a US state.

    Args:
        request: The request object containing:
            - state: Two-letter US state code (e.g. CA, NY)
    """
    headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
    url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"

    async with httpx.AsyncClient() as client:
        response = await client.get(url, headers=headers, timeout=5.0)
        response.raise_for_status()
        return json.dumps(response.json())

بعد ذلك، حدِّد أدوات رصد الموقع الجغرافي لعنوان IP:

class GetLocationRequest(BaseModel):
    """Request model for getting location info from an IP address."""

    ipaddress: str = Field(description="An IP address")


async def get_ip_address() -> str:
    """Get the public IP address of the current machine."""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://icanhazip.com")
        response.raise_for_status()
        return response.text.strip()


async def get_location_info(request: GetLocationRequest) -> str:
    """Get the location information for an IP address including city, state, and country.

    Args:
        request: The request object containing:
            - ipaddress: An IP address to look up
    """
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
        response.raise_for_status()
        result = response.json()
        return f"{result['city']}, {result['regionName']}, {result['country']}"

سجلّ الأدوات

بعد ذلك، أنشئ سجلًّا يربط أسماء الأدوات بدوال المعالجة. تنشئ الدالة get_tools() عناصر FunctionDeclaration متوافقة مع Gemini من العناصر القابلة للاستدعاء باستخدام FunctionDeclaration.from_callable_with_api_option().

from typing import Any, Awaitable, Callable

ToolHandler = Callable[..., Awaitable[Any]]


def get_handler(tool_name: str) -> ToolHandler:
    """Get the handler function for a given tool name."""
    if tool_name == "get_location_info":
        return get_location_info
    if tool_name == "get_ip_address":
        return get_ip_address
    if tool_name == "get_weather_alerts":
        return get_weather_alerts
    raise ValueError(f"Unknown tool name: {tool_name}")


def get_tools() -> types.Tool:
    """Get the Tool object containing all available function declarations.

    Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
    to generate tool definitions from the handler functions.
    """
    return types.Tool(
        function_declarations=[
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_weather_alerts, api_option="GEMINI_API"
            ),
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_location_info, api_option="GEMINI_API"
            ),
            types.FunctionDeclaration.from_callable_with_api_option(
                callable=get_ip_address, api_option="GEMINI_API"
            ),
        ]
    )

نشاط النماذج اللغوية الكبيرة

الآن، حدِّد النشاط الذي يستدعي Gemini API. تحدّد فئتا البيانات GeminiChatRequest وGeminiChatResponse العقد.

عليك إيقاف ميزة استدعاء الدوال التلقائي لكي يتم التعامل مع استدعاء النموذج اللغوي الكبير واستدعاء الأداة كمهام منفصلة، ما يمنح الوكيل المزيد من الثبات. عليك أيضًا إيقاف عمليات إعادة المحاولة المضمّنة في حزمة SDK (attempts=1) لأنّ Temporal تتعامل مع عمليات إعادة المحاولة بشكل دائم.

import os
from dataclasses import dataclass

from temporalio import activity

@dataclass
class GeminiChatRequest:
    """Request parameters for a Gemini chat completion."""

    model: str
    system_instruction: str
    contents: list[types.Content]
    tools: list[types.Tool]


@dataclass
class GeminiChatResponse:
    """Response from a Gemini chat completion."""

    text: str | None
    function_calls: list[dict[str, Any]]
    raw_parts: list[types.Part]


@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
    """Execute a Gemini chat completion with tool support."""
    api_key = os.environ.get("GOOGLE_API_KEY")
    if not api_key:
        raise ValueError("GOOGLE_API_KEY environment variable is not set")
    client = genai.Client(
        api_key=api_key,
        http_options=types.HttpOptions(
            retry_options=types.HttpRetryOptions(attempts=1),
        ),
    )

    config = types.GenerateContentConfig(
        system_instruction=request.system_instruction,
        tools=request.tools,
        automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
    )

    response = await client.aio.models.generate_content(
        model=request.model,
        contents=request.contents,
        config=config,
    )

    function_calls = []
    raw_parts = []
    text_parts = []

    if response.candidates and response.candidates[0].content:
        for part in response.candidates[0].content.parts:
            raw_parts.append(part)
            if part.function_call:
                function_calls.append(
                    {
                        "name": part.function_call.name,
                        "args": dict(part.function_call.args) if part.function_call.args else {},
                    }
                )
            elif part.text:
                text_parts.append(part.text)

    text = "".join(text_parts) if text_parts and not function_calls else None

    return GeminiChatResponse(
        text=text,
        function_calls=function_calls,
        raw_parts=raw_parts,
    )

نشاط الأداة الديناميكية

بعد ذلك، حدِّد النشاط الذي ينفّذ الأدوات. يستخدم هذا الخيار ميزة الأنشطة الديناميكية في Temporal: يتم الحصول على معالج الأداة (عنصر قابل للاستدعاء) من سجل الأدوات من خلال الدالة get_handler. يتيح ذلك تحديد وكلاء مختلفين ببساطة من خلال توفير مجموعة مختلفة من الأدوات وتعليمات النظام، ولا يتطلب سير العمل الذي ينفّذ حلقة الوكيل أي تغييرات.

يفحص النشاط توقيع المعالج لتحديد كيفية تمرير الوسيطات. إذا كان المعالج يتوقّع نموذج Pydantic، سيتعامل مع تنسيق الإخراج المتداخل الذي ينتجه Gemini (على سبيل المثال، {"request": {"state": "CA"}} بدلاً من {"state": "CA"} المسطّح).

import inspect
from collections.abc import Sequence

from temporalio.common import RawValue

@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
    """Execute a tool dynamically based on the activity name."""
    tool_name = activity.info().activity_type
    tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
    activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")

    handler = get_handler(tool_name)

    if not inspect.iscoroutinefunction(handler):
        raise TypeError("Tool handler must be async (awaitable).")

    sig = inspect.signature(handler)
    params = list(sig.parameters.values())

    if len(params) == 0:
        result = await handler()
    else:
        param = params[0]
        param_name = param.name
        ann = param.annotation

        if isinstance(ann, type) and issubclass(ann, BaseModel):
            nested_args = tool_args.get(param_name, tool_args)
            result = await handler(ann(**nested_args))
        else:
            result = await handler(**tool_args)

    activity.logger.info(f"Tool '{tool_name}' result: {result}")
    return result

سير عمل حلقة الذكاء الاصطناعي الوكيل

أصبح لديك الآن كل العناصر اللازمة لإنهاء إنشاء الوكيل. تنفِّذ الفئة AgentWorkflow سير عمل يحتوي على حلقة الوكيل. ضمن هذه الحلقة، يتم استدعاء النموذج اللغوي الكبير من خلال النشاط (ما يجعله دائمًا)، ويتم فحص الناتج، وإذا اختار النموذج اللغوي الكبير أداة، يتم استدعاؤها من خلال dynamic_tool_activity.

في هذا النوع البسيط من العملاء المستند إلى أسلوب الاستدلال واتخاذ الإجراءات، بمجرد أن يقرّر النموذج اللغوي الكبير عدم استخدام أداة، يتم اعتبار الحلقة مكتملة ويتم عرض النتيجة النهائية للنموذج اللغوي الكبير.

from datetime import timedelta

@workflow.defn
class AgentWorkflow:
    """Agentic loop workflow that uses Gemini for LLM calls and executes tools."""

    @workflow.run
    async def run(self, input: str) -> str:
        contents: list[types.Content] = [
            types.Content(role="user", parts=[types.Part.from_text(text=input)])
        ]

        tools = [get_tools()]

        while True:
            result = await workflow.execute_activity(
                generate_content,
                GeminiChatRequest(
                    model="gemini-3-flash-preview",
                    system_instruction=SYSTEM_INSTRUCTIONS,
                    contents=contents,
                    tools=tools,
                ),
                start_to_close_timeout=timedelta(seconds=60),
            )

            if result.function_calls:
                # Sending the complete raw_parts here ensures Gemini 3 thought
                # signatures are propagated correctly.
                contents.append(types.Content(role="model", parts=result.raw_parts))

                for function_call in result.function_calls:
                    tool_result = await self._handle_function_call(function_call)

                    contents.append(
                        types.Content(
                            role="user",
                            parts=[
                                types.Part.from_function_response(
                                    name=function_call["name"],
                                    response={"result": tool_result},
                                )
                            ],
                        )
                    )
            else:
                return result.text
            # Leave this in place. You will un-comment it during a durability
            # test later on.
            # await asyncio.sleep(10)

    async def _handle_function_call(self, function_call: dict) -> str:
        """Execute a tool via dynamic activity and return the result."""
        tool_name = function_call["name"]
        tool_args = function_call.get("args", {})

        result = await workflow.execute_activity(
            tool_name,
            tool_args,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return result

الحلقة المستندة إلى الذكاء الاصطناعي التوليدي متينة تمامًا. إذا تعذّر تشغيل عامل الوكيل بعد عدة تكرارات خلال الحلقة، ستستأنف Temporal العمل من حيث توقّفت بالضبط بدون الحاجة إلى إعادة استدعاء استدعاءات نماذج اللغة الكبيرة أو استدعاءات الأدوات التي تم تنفيذها من قبل.

بدء تشغيل العامل

أخيرًا، وصِّل كل الأجزاء ببعضها. على الرغم من أنّ الرمز البرمجي ينفّذ منطق النشاط التجاري اللازم بطريقة تجعله يبدو وكأنّه يعمل في عملية واحدة، فإنّ استخدام Temporal يجعله نظامًا مستندًا إلى الأحداث (وتحديدًا، مستندًا إلى مصدر الأحداث) حيث يتم التواصل بين سير العمل والأنشطة من خلال المراسلة التي توفّرها Temporal.

يتصل عامل Temporal بخدمة Temporal ويعمل كأداة جدولة لمهام سير العمل والأنشطة. يسجّل العامل سير العمل وكلا النشاطين، ثم يبدأ في الاستماع إلى المهام.

import asyncio
from concurrent.futures import ThreadPoolExecutor

from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

async def main():
    config = ClientConfig.load_client_connect_config()
    config.setdefault("target_host", "localhost:7233")
    client = await Client.connect(
        **config,
        data_converter=pydantic_data_converter,
    )

    worker = Worker(
        client,
        task_queue="gemini-agent-python-task-queue",
        workflows=[
            AgentWorkflow,
        ],
        activities=[
            generate_content,
            dynamic_tool_activity,
        ],
        activity_executor=ThreadPoolExecutor(max_workers=10),
    )
    await worker.run()


if __name__ == "__main__":
    load_dotenv()
    asyncio.run(main())

نص برمجي من جهة العميل

أنشِئ نصًا برمجيًا للعميل (start_workflow.py). يرسل هذا النص طلب بحث وينتظر النتيجة. لاحظ أنّه يتصل بقائمة انتظار المهام نفسها المشار إليها في عامل البرنامج—يرسل النص البرمجي start_workflow مهمة سير عمل تتضمّن طلب المستخدم إلى قائمة انتظار المهام هذه، ما يؤدي إلى بدء تنفيذ البرنامج.

import asyncio
import sys
import uuid

from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter


async def main():
    client = await Client.connect(
        "localhost:7233",
        data_converter=pydantic_data_converter,
    )

    query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"

    result = await client.execute_workflow(
        "AgentWorkflow",
        query,
        id=f"gemini-agent-id-{uuid.uuid4()}",
        task_queue="gemini-agent-python-task-queue",
    )
    print(f"\nResult:\n{result}")


if __name__ == "__main__":
    asyncio.run(main())

تشغيل الوكيل

إذا لم يسبق لك إجراء ذلك، ابدأ تشغيل خادم تطوير Temporal:

temporal server start-dev

في نافذة طرفية جديدة، ابدأ تشغيل عامل الوكيل:

python -m durable_agent_worker

في نافذة طرفية ثالثة، أرسِل طلب بحث إلى وكيلك:

python -m start_workflow "are there any weather alerts for where I am?"

لاحظ الناتج في نافذة الأوامر durable_agent_worker الذي يعرض الإجراءات التي تحدث في كل تكرار من حلقة التشغيل. يستطيع النموذج اللغوي الكبير تلبية طلب المستخدم من خلال استخدام سلسلة من الأدوات المتاحة له. يمكنك الاطّلاع على الخطوات التي تم تنفيذها من خلال واجهة مستخدم Temporal على http://localhost:8233/namespaces/default/workflows.

جرِّب توجيه بضعة طلبات مختلفة للاطّلاع على سبب الاتصال وأدواته:

python -m start_workflow "are there any weather alerts for New York?"
python -m start_workflow "where am I?"
python -m start_workflow "what is my ip address?"
python -m start_workflow "tell me a joke"

لا يتطلّب الطلب الأخير أي أدوات، لذا يستجيب الوكيل ببيت شعر مكوّن من سبعة مقاطع صوتية ثم خمسة ثم سبعة، استنادًا إلى SYSTEM_INSTRUCTIONS.

مدة الاختبار (اختياري)

يضمن استخدام Temporal أن يستمر الوكيل في العمل بسلاسة في حال حدوث أي أعطال. يمكنك اختبار ذلك باستخدام تجربتَين مختلفتَين.

محاكاة انقطاع الشبكة

في هذا الاختبار، عليك إيقاف اتصال الإنترنت على الكمبيوتر مؤقتًا، ثم إرسال سير عمل، ومشاهدة Temporal وهي تعيد المحاولة تلقائيًا، ثم إعادة الاتصال بالشبكة لمعرفة ما إذا كان سيتم استعادة الاتصال.

  1. افصل جهازك عن الإنترنت (على سبيل المثال، أوقِف شبكة Wi-Fi).
  2. إرسال سير عمل:

    python -m start_workflow "tell me a joke"
  3. تحقَّق من واجهة مستخدم Temporal (http://localhost:8233). ستلاحظ تعذُّر تنفيذ نشاط نموذج اللغة الكبير، وستدير Temporal تلقائيًا عمليات إعادة المحاولة في الخلفية.

  4. أعِد الاتصال بالإنترنت.

  5. ستصل محاولة إعادة المحاولة المبرمَجة التالية بنجاح إلى Gemini API، وستعرض المحطة الطرفية النتيجة النهائية.

النجاة من تعطل عامل

في هذا الاختبار، يتم إيقاف العامل أثناء التنفيذ وإعادة تشغيله. تعيد عمليات التشغيل المؤقتة تشغيل سجلّ سير العمل (تحديد مصدر الحدث) وتستأنف من آخر نشاط تم إكماله، ولا يتم تكرار عمليات استدعاء نماذج اللغات الكبيرة ومكالمات الأدوات التي تم إكمالها.

  1. لمنح نفسك وقتًا لإيقاف العامل، افتح durable_agent_worker.py وأزِل مؤقتًا التعليق من await asyncio.sleep(10) داخل حلقة AgentWorkflow run.
  2. أعِد تشغيل العامل:

    python -m durable_agent_worker
  3. إرسال طلب بحث يؤدي إلى تشغيل عدة أدوات:

    python -m start_workflow "are there any weather alerts where I am?"
  4. يمكنك إيقاف عملية العامل في أي وقت قبل اكتمالها (Ctrl-C في نافذة العامل أو باستخدام kill %1 إذا كانت العملية تعمل في الخلفية).

  5. أعِد تشغيل العامل:

    python -m durable_agent_worker

تعيد Temporal تشغيل سجلّ سير العمل. إنّ عمليات استدعاء النماذج اللغوية الكبيرة والأدوات التي اكتملت لا يتم إعادة تنفيذها، بل يتم إعادة عرض نتائجها على الفور من السجلّ (سجلّ الأحداث). ينتهي سير العمل بنجاح.

موارد أخرى