يرشدك هذا البرنامج التعليمي إلى خطوات إنشاء حلقة عامل بأسلوب ReAct تستخدم واجهة برمجة التطبيقات Gemini API للاستدلال وTemporal لضمان استمراريتها. يتوفّر الرمز المصدر الكامل لهذا البرنامج التعليمي على GitHub.
يمكن للوكيل استدعاء أدوات، مثل البحث عن تنبيهات بشأن الطقس أو تحديد الموقع الجغرافي لعنوان IP، وسيواصل البحث إلى أن يحصل على معلومات كافية للرد.
ما يميّز هذا العرض التوضيحي عن العروض التوضيحية النموذجية هو المتانة. يحتفظ Temporal بكل طلب يتم إرساله إلى نموذج اللغة الكبير (LLM) وكل عملية استدعاء لأداة وكل خطوة من خطوات حلقة التنفيذ. في حال تعذُّر إكمال العملية أو انقطاع الشبكة أو انتهاء المهلة المحدّدة لواجهة برمجة التطبيقات، تعيد Temporal المحاولة تلقائيًا وتستأنف العملية من آخر خطوة تم إكمالها. لن يتم فقدان أي سجلّ محادثات، ولن يتم تكرار أي طلبات أدوات بشكل غير صحيح.
الهندسة المعمارية
تتألف البنية من ثلاثة أجزاء:
- سير العمل: حلقة الوكيل التي تنظّم منطق التنفيذ.
- الأنشطة: هي وحدات عمل فردية (طلبات إلى النموذج اللغوي الكبير، وطلبات إلى الأدوات) تجعلها Temporal دائمة.
- العامل: العملية التي تنفّذ سير العمل والأنشطة.
في هذا المثال، ستضع كل هذه الأجزاء الثلاثة في ملف واحد (durable_agent_worker.py). وفي عملية التنفيذ في العالم الحقيقي، ستفصل بينها للاستفادة من مزايا مختلفة في ما يتعلق بالنشر وقابلية التوسّع. ستضع الرمز الذي يقدّم طلبًا إلى الوكيل في ملف ثانٍ (start_workflow.py).
المتطلبات الأساسية
لإكمال هذا الدليل، ستحتاج إلى:
- مفتاح Gemini API يمكنك إنشاء حساب مجانًا في Google AI Studio.
- الإصدار 3.10 أو الإصدارات الأحدث من Python
- واجهة سطر الأوامر Temporal لتشغيل خادم تطوير محلي
الإعداد
قبل البدء، تأكَّد من تشغيل خادم تطوير Temporal محليًا:
temporal server start-devبعد ذلك، ثبِّت التبعيات المطلوبة:
pip install temporalio google-genai httpx pydantic python-dotenvأنشئ ملف .env في دليل مشروعك باستخدام مفتاح Gemini API. يمكنك الحصول على مفتاح واجهة برمجة التطبيقات من Google AI Studio.
echo "GOOGLE_API_KEY=your-api-key-here" > .envالتنفيذ
توضّح لك بقية هذا البرنامج التعليمي كيفية استخدام durable_agent_worker.py من البداية إلى النهاية، مع إنشاء جزء الوكيل خطوة بخطوة. أنشئ الملف واتّبِع الخطوات.
عمليات الاستيراد وإعداد وضع الحماية
ابدأ بعمليات الاستيراد التي يجب تحديدها مسبقًا. يخبر الحظر workflow.unsafe.imports_passed_through() بيئة الاختبار المعزولة لسير العمل في Temporal بأنّ بعض الوحدات يمكنها المرور بدون قيود. هذا الإجراء ضروري لأنّ العديد من المكتبات (خاصةً httpx التي تنتمي إلى الفئة الفرعية urllib.request.Request) تستخدم أنماطًا سيحظرها وضع الحماية في حال عدم اتّخاذ هذا الإجراء.
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
تعليمات النظام
بعد ذلك، حدِّد شخصية الوكيل. تخبر تعليمات النظام النموذج بكيفية التصرف. يتم توجيه هذا الوكيل بالرد في أبيات شعرية من نوع "هايكو" عندما لا تكون هناك حاجة إلى أدوات.
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
تعريفات الأدوات
الآن، حدِّد الأدوات التي يمكن للوكيل استخدامها. كل أداة هي دالة غير متزامنة تتضمّن سلسلة مستندات وصفية. تستخدم الأدوات التي تتضمّن مَعلمات نموذج Pydantic كمعلَمة واحدة. هذه أفضل ممارسة في Temporal تحافظ على ثبات تواقيع الأنشطة عند إضافة حقول اختيارية بمرور الوقت.
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
بعد ذلك، حدِّد أدوات رصد الموقع الجغرافي لعنوان IP:
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
سجلّ الأدوات
بعد ذلك، أنشئ سجلًّا يربط أسماء الأدوات بدوال المعالجة. تنشئ الدالة
get_tools() عناصر FunctionDeclaration متوافقة مع Gemini
من العناصر القابلة للاستدعاء باستخدام FunctionDeclaration.from_callable_with_api_option().
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
نشاط النماذج اللغوية الكبيرة
الآن، حدِّد النشاط الذي يستدعي Gemini API. تحدّد فئتا البيانات GeminiChatRequest وGeminiChatResponse العقد.
عليك إيقاف ميزة استدعاء الدوال التلقائي لكي يتم التعامل مع استدعاء النموذج اللغوي الكبير واستدعاء الأداة كمهام منفصلة، ما يمنح الوكيل المزيد من الثبات. عليك أيضًا إيقاف عمليات إعادة المحاولة المضمّنة في حزمة SDK (attempts=1) لأنّ
Temporal تتعامل مع عمليات إعادة المحاولة بشكل دائم.
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
نشاط الأداة الديناميكية
بعد ذلك، حدِّد النشاط الذي ينفّذ الأدوات. يستخدم هذا الخيار ميزة الأنشطة الديناميكية في Temporal: يتم الحصول على معالج الأداة (عنصر قابل للاستدعاء) من سجل الأدوات من خلال الدالة get_handler. يتيح ذلك تحديد وكلاء مختلفين ببساطة من خلال توفير مجموعة مختلفة من الأدوات وتعليمات النظام، ولا يتطلب سير العمل الذي ينفّذ حلقة الوكيل أي تغييرات.
يفحص النشاط توقيع المعالج لتحديد كيفية تمرير الوسيطات. إذا كان المعالج يتوقّع نموذج Pydantic، سيتعامل مع تنسيق الإخراج المتداخل الذي ينتجه Gemini (على سبيل المثال، {"request": {"state": "CA"}} بدلاً من {"state": "CA"} المسطّح).
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
سير عمل حلقة الذكاء الاصطناعي الوكيل
أصبح لديك الآن كل العناصر اللازمة لإنهاء إنشاء الوكيل. تنفِّذ الفئة AgentWorkflow
سير عمل يحتوي على حلقة الوكيل. ضمن هذه الحلقة، يتم استدعاء النموذج اللغوي الكبير من خلال النشاط (ما يجعله دائمًا)، ويتم فحص الناتج، وإذا اختار النموذج اللغوي الكبير أداة، يتم استدعاؤها من خلال dynamic_tool_activity.
في هذا النوع البسيط من العملاء المستند إلى أسلوب الاستدلال واتخاذ الإجراءات، بمجرد أن يقرّر النموذج اللغوي الكبير عدم استخدام أداة، يتم اعتبار الحلقة مكتملة ويتم عرض النتيجة النهائية للنموذج اللغوي الكبير.
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3-flash-preview",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
الحلقة المستندة إلى الذكاء الاصطناعي التوليدي متينة تمامًا. إذا تعذّر تشغيل عامل الوكيل بعد عدة تكرارات خلال الحلقة، ستستأنف Temporal العمل من حيث توقّفت بالضبط بدون الحاجة إلى إعادة استدعاء استدعاءات نماذج اللغة الكبيرة أو استدعاءات الأدوات التي تم تنفيذها من قبل.
بدء تشغيل العامل
أخيرًا، وصِّل كل الأجزاء ببعضها. على الرغم من أنّ الرمز البرمجي ينفّذ منطق النشاط التجاري اللازم بطريقة تجعله يبدو وكأنّه يعمل في عملية واحدة، فإنّ استخدام Temporal يجعله نظامًا مستندًا إلى الأحداث (وتحديدًا، مستندًا إلى مصدر الأحداث) حيث يتم التواصل بين سير العمل والأنشطة من خلال المراسلة التي توفّرها Temporal.
يتصل عامل Temporal بخدمة Temporal ويعمل كأداة جدولة لمهام سير العمل والأنشطة. يسجّل العامل سير العمل وكلا النشاطين، ثم يبدأ في الاستماع إلى المهام.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
نص برمجي من جهة العميل
أنشِئ نصًا برمجيًا للعميل (start_workflow.py). يرسل هذا النص طلب بحث وينتظر النتيجة. لاحظ أنّه يتصل بقائمة انتظار المهام نفسها المشار إليها في عامل
البرنامج—يرسل النص البرمجي start_workflow مهمة سير عمل تتضمّن طلب المستخدم إلى قائمة انتظار المهام هذه، ما يؤدي إلى بدء تنفيذ البرنامج.
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
تشغيل الوكيل
إذا لم يسبق لك إجراء ذلك، ابدأ تشغيل خادم تطوير Temporal:
temporal server start-devفي نافذة طرفية جديدة، ابدأ تشغيل عامل الوكيل:
python -m durable_agent_workerفي نافذة طرفية ثالثة، أرسِل طلب بحث إلى وكيلك:
python -m start_workflow "are there any weather alerts for where I am?"لاحظ الناتج في نافذة الأوامر durable_agent_worker الذي يعرض الإجراءات التي تحدث في كل تكرار من حلقة التشغيل. يستطيع النموذج اللغوي الكبير تلبية طلب المستخدم من خلال استخدام سلسلة من الأدوات المتاحة له. يمكنك الاطّلاع على الخطوات التي تم تنفيذها من خلال واجهة مستخدم Temporal على http://localhost:8233/namespaces/default/workflows.
جرِّب توجيه بضعة طلبات مختلفة للاطّلاع على سبب الاتصال وأدواته:
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
لا يتطلّب الطلب الأخير أي أدوات، لذا يستجيب الوكيل ببيت شعر
مكوّن من سبعة مقاطع صوتية ثم خمسة ثم سبعة، استنادًا إلى SYSTEM_INSTRUCTIONS.
مدة الاختبار (اختياري)
يضمن استخدام Temporal أن يستمر الوكيل في العمل بسلاسة في حال حدوث أي أعطال. يمكنك اختبار ذلك باستخدام تجربتَين مختلفتَين.
محاكاة انقطاع الشبكة
في هذا الاختبار، عليك إيقاف اتصال الإنترنت على الكمبيوتر مؤقتًا، ثم إرسال سير عمل، ومشاهدة Temporal وهي تعيد المحاولة تلقائيًا، ثم إعادة الاتصال بالشبكة لمعرفة ما إذا كان سيتم استعادة الاتصال.
- افصل جهازك عن الإنترنت (على سبيل المثال، أوقِف شبكة Wi-Fi).
إرسال سير عمل:
python -m start_workflow "tell me a joke"تحقَّق من واجهة مستخدم Temporal (
http://localhost:8233). ستلاحظ تعذُّر تنفيذ نشاط نموذج اللغة الكبير، وستدير Temporal تلقائيًا عمليات إعادة المحاولة في الخلفية.أعِد الاتصال بالإنترنت.
ستصل محاولة إعادة المحاولة المبرمَجة التالية بنجاح إلى Gemini API، وستعرض المحطة الطرفية النتيجة النهائية.
النجاة من تعطل عامل
في هذا الاختبار، يتم إيقاف العامل أثناء التنفيذ وإعادة تشغيله. تعيد عمليات التشغيل المؤقتة تشغيل سجلّ سير العمل (تحديد مصدر الحدث) وتستأنف من آخر نشاط تم إكماله، ولا يتم تكرار عمليات استدعاء نماذج اللغات الكبيرة ومكالمات الأدوات التي تم إكمالها.
- لمنح نفسك وقتًا لإيقاف العامل، افتح
durable_agent_worker.pyوأزِل مؤقتًا التعليق منawait asyncio.sleep(10)داخل حلقةAgentWorkflowrun. أعِد تشغيل العامل:
python -m durable_agent_workerإرسال طلب بحث يؤدي إلى تشغيل عدة أدوات:
python -m start_workflow "are there any weather alerts where I am?"يمكنك إيقاف عملية العامل في أي وقت قبل اكتمالها (
Ctrl-Cفي نافذة العامل أو باستخدامkill %1إذا كانت العملية تعمل في الخلفية).أعِد تشغيل العامل:
python -m durable_agent_worker
تعيد Temporal تشغيل سجلّ سير العمل. إنّ عمليات استدعاء النماذج اللغوية الكبيرة والأدوات التي اكتملت لا يتم إعادة تنفيذها، بل يتم إعادة عرض نتائجها على الفور من السجلّ (سجلّ الأحداث). ينتهي سير العمل بنجاح.