এই টিউটোরিয়ালটি আপনাকে একটি ReAct-স্টাইলের এজেন্টিক লুপ তৈরি করার পদ্ধতি ধাপে ধাপে দেখাবে, যা রিজনিং-এর জন্য Gemini API এবং ডিউরেবিলিটির জন্য Temporal ব্যবহার করে। এই টিউটোরিয়ালটির সম্পূর্ণ সোর্স কোড GitHub- এ পাওয়া যাবে।
এজেন্টটি আবহাওয়ার সতর্কতা খোঁজা বা কোনো আইপি অ্যাড্রেসের অবস্থান নির্ণয় করার মতো টুলগুলোকে কল করতে পারে এবং সাড়া দেওয়ার জন্য পর্যাপ্ত তথ্য না পাওয়া পর্যন্ত এই প্রক্রিয়াটি পুনরাবৃত্তি করতে থাকবে।
একটি সাধারণ এজেন্ট ডেমো থেকে এটিকে যা আলাদা করে তা হলো এর স্থায়িত্ব । প্রতিটি LLM কল, প্রতিটি টুল ইনভোকেশন এবং এজেন্টিক লুপের প্রতিটি ধাপ Temporal দ্বারা সংরক্ষিত থাকে। যদি প্রসেসটি ক্র্যাশ করে, নেটওয়ার্ক বিচ্ছিন্ন হয়ে যায়, বা কোনো API-এর সময়সীমা শেষ হয়ে যায়, Temporal স্বয়ংক্রিয়ভাবে পুনরায় চেষ্টা করে এবং সর্বশেষ সম্পন্ন হওয়া ধাপ থেকে আবার শুরু করে। কোনো কথোপকথনের ইতিহাস হারিয়ে যায় না এবং কোনো টুল কল ভুলভাবে পুনরাবৃত্তি হয় না।
স্থাপত্য
স্থাপত্যটি তিনটি অংশ নিয়ে গঠিত:
- ওয়ার্কফ্লো: সেই সক্রিয় চক্র যা কার্য সম্পাদনের যুক্তিকে সমন্বয় করে।
- কার্যকলাপ: কাজের স্বতন্ত্র একক (এলএলএম কল, টুল কল) যা টেম্পোরাল টেকসই করে তোলে।
- কর্মী: যে প্রক্রিয়াটি কর্মপ্রবাহ এবং কার্যকলাপগুলো সম্পাদন করে।
এই উদাহরণে, আপনি এই তিনটি অংশই একটি ফাইলে ( durable_agent_worker.py ) রাখবেন। বাস্তব প্রয়োগে, বিভিন্ন ডেপ্লয়মেন্ট এবং স্কেলেবিলিটি সুবিধার জন্য আপনি এগুলোকে আলাদা করবেন। এজেন্টকে প্রম্পট সরবরাহকারী কোডটি আপনি দ্বিতীয় একটি ফাইলে ( start_workflow.py ) রাখবেন।
পূর্বশর্ত
এই নির্দেশিকাটি সম্পূর্ণ করতে আপনার প্রয়োজন হবে:
- একটি জেমিনি এপিআই কী। আপনি গুগল এআই স্টুডিও- তে বিনামূল্যে এটি তৈরি করতে পারেন।
- পাইথন সংস্করণ ৩.১০ বা তার পরবর্তী সংস্করণ।
- স্থানীয় ডেভেলপমেন্ট সার্ভার চালানোর জন্য টেম্পোরাল সিএলআই ।
সেটআপ
শুরু করার আগে, নিশ্চিত করুন যে আপনার লোকালি একটি টেম্পোরাল ডেভেলপমেন্ট সার্ভার চালু আছে:
temporal server start-devএরপর, প্রয়োজনীয় ডিপেন্ডেন্সিগুলো ইনস্টল করুন:
pip install temporalio google-genai httpx pydantic python-dotenv আপনার প্রজেক্ট ডিরেক্টরিতে আপনার জেমিনি এপিআই কী (Gemini API key) দিয়ে একটি .env ফাইল তৈরি করুন। আপনি গুগল এআই স্টুডিও (Google AI Studio) থেকে একটি এপিআই কী পেতে পারেন।
echo "GOOGLE_API_KEY=your-api-key-here" > .envবাস্তবায়ন
এই টিউটোরিয়ালের বাকি অংশে durable_agent_worker.py ফাইলটি শুরু থেকে শেষ পর্যন্ত ধাপে ধাপে তৈরি করার প্রক্রিয়া দেখানো হয়েছে। ফাইলটি তৈরি করুন এবং নির্দেশাবলী অনুসরণ করুন।
আমদানি এবং স্যান্ডবক্স সেটআপ
যে ইম্পোর্টগুলো আগে থেকেই সংজ্ঞায়িত করতে হবে, সেগুলো দিয়ে শুরু করুন। workflow.unsafe.imports_passed_through() ব্লকটি Temporal-এর ওয়ার্কফ্লো স্যান্ডবক্সকে নির্দিষ্ট কিছু মডিউলকে কোনো বাধা ছাড়াই পাস করতে নির্দেশ দেয়। এটি প্রয়োজনীয়, কারণ বেশ কিছু লাইব্রেরি (বিশেষ করে httpx , যা urllib.request.Request এর সাবক্লাস) এমন কিছু প্যাটার্ন ব্যবহার করে যা স্যান্ডবক্স অন্যথায় ব্লক করে দিত।
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
সিস্টেম নির্দেশাবলী
এরপর, এজেন্টের ব্যক্তিত্ব নির্ধারণ করুন। সিস্টেমের নির্দেশাবলী মডেলকে বলে দেয় কীভাবে আচরণ করতে হবে। এই এজেন্টকে নির্দেশ দেওয়া হয়েছে যে, যখন কোনো উপকরণের প্রয়োজন হবে না, তখন সে যেন হাইকু ছন্দে উত্তর দেয়।
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
টুল সংজ্ঞা
এখন এজেন্ট যে টুলগুলো ব্যবহার করতে পারবে তা নির্ধারণ করুন। প্রতিটি টুল হলো একটি অ্যাসিঙ্ক ফাংশন, যার একটি বর্ণনামূলক ডকস্ট্রিং রয়েছে। যে টুলগুলো প্যারামিটার গ্রহণ করে, সেগুলো তাদের একমাত্র আর্গুমেন্ট হিসেবে একটি পাইড্যান্টিক মডেল ব্যবহার করে। এটি একটি টেম্পোরাল সেরা অনুশীলন, যা সময়ের সাথে সাথে ঐচ্ছিক ফিল্ড যুক্ত করার পরেও অ্যাক্টিভিটি সিগনেচারকে স্থিতিশীল রাখে।
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
এরপরে, আইপি অ্যাড্রেস জিওলোকেশনের জন্য টুলগুলো সংজ্ঞায়িত করুন:
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
টুল রেজিস্ট্রি
এরপরে, একটি রেজিস্ট্রি তৈরি করুন যা টুলের নামগুলোকে হ্যান্ডলার ফাংশনের সাথে ম্যাপ করে। get_tools() ফাংশনটি FunctionDeclaration.from_callable_with_api_option() ব্যবহার করে কলযোগ্য ফাংশনগুলো থেকে Gemini-উপযোগী FunctionDeclaration অবজেক্ট তৈরি করে।
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
এলএলএম কার্যকলাপ
এখন সেই অ্যাক্টিভিটিটি সংজ্ঞায়িত করুন যা জেমিনি এপিআই (Gemini API) কল করে। GeminiChatRequest এবং GeminiChatResponse ডেটাক্লাসগুলো কন্ট্রাক্টটি সংজ্ঞায়িত করে।
আপনি স্বয়ংক্রিয় ফাংশন কলিং নিষ্ক্রিয় করবেন, যাতে LLM আহ্বান এবং টুল আহ্বান পৃথক কাজ হিসাবে পরিচালিত হয়, যা আপনার এজেন্টের স্থায়িত্ব বাড়াবে। আপনি SDK-এর অন্তর্নির্মিত পুনঃপ্রচেষ্টাও ( attempts=1 ) নিষ্ক্রিয় করবেন, কারণ Temporal পুনঃপ্রচেষ্টাগুলো টেকসইভাবে পরিচালনা করে।
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
গতিশীল সরঞ্জাম কার্যকলাপ
এরপরে, টুলগুলো কার্যকর করার জন্য অ্যাক্টিভিটিটি সংজ্ঞায়িত করুন। এর জন্য টেম্পোরাল-এর ডাইনামিক অ্যাক্টিভিটি বৈশিষ্ট্যটি ব্যবহৃত হয়: টুল হ্যান্ডলার (একটি কলযোগ্য ফাংশন) get_handler ফাংশনের মাধ্যমে টুল রেজিস্ট্রি থেকে সংগ্রহ করা হয়। এর ফলে, শুধুমাত্র ভিন্ন ভিন্ন টুল ও সিস্টেম নির্দেশাবলী সরবরাহ করার মাধ্যমেই বিভিন্ন এজেন্টকে সংজ্ঞায়িত করা যায়; এজেন্টিক লুপ বাস্তবায়নকারী ওয়ার্কফ্লোতে কোনো পরিবর্তনের প্রয়োজন হয় না।
আর্গুমেন্ট কীভাবে পাস করতে হবে তা নির্ধারণ করার জন্য অ্যাক্টিভিটিটি হ্যান্ডলারের সিগনেচার পরীক্ষা করে। যদি হ্যান্ডলারটি একটি পাইড্যান্টিক মডেল প্রত্যাশা করে, তবে এটি জেমিনি দ্বারা উৎপাদিত নেস্টেড আউটপুট ফরম্যাটটি হ্যান্ডেল করে (উদাহরণস্বরূপ, একটি ফ্ল্যাট {"request": {"state": "CA"}} {"state": "CA"} )।
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
এজেন্টিক লুপ ওয়ার্কফ্লো
এখন এজেন্টটি তৈরি শেষ করার জন্য আপনার কাছে সমস্ত উপাদান রয়েছে। AgentWorkflow ক্লাসটি একটি ওয়ার্কফ্লো বাস্তবায়ন করে, যার মধ্যে এজেন্ট লুপটি থাকে। সেই লুপের মধ্যে, অ্যাক্টিভিটির মাধ্যমে LLM-কে আহ্বান করা হয় (যা এটিকে টেকসই করে তোলে), এর আউটপুট পরীক্ষা করা হয়, এবং যদি LLM দ্বারা কোনো টুল নির্বাচন করা হয়ে থাকে, তবে সেটি dynamic_tool_activity মাধ্যমে আহ্বান করা হয়।
এই সাধারণ ReAct স্টাইলের এজেন্টে, LLM যখন কোনো টুল ব্যবহার না করার সিদ্ধান্ত নেয়, তখন লুপটি সম্পূর্ণ বলে গণ্য করা হয় এবং LLM-এর চূড়ান্ত ফলাফল ফেরত দেওয়া হয়।
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3.5-flash",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
এজেন্টিক লুপটি সম্পূর্ণরূপে টেকসই। যদি লুপের মধ্যে বেশ কয়েকটি পুনরাবৃত্তির পর এজেন্ট ওয়ার্কারটি ক্র্যাশ করে, তাহলে ইতিমধ্যে সম্পাদিত LLM ইনভোকেশন বা টুল কলগুলিকে পুনরায় আহ্বান করার প্রয়োজন ছাড়াই টেম্পোরাল ঠিক যেখান থেকে থেমেছিল সেখান থেকেই আবার কাজ শুরু করবে।
কর্মী স্টার্টআপ
অবশেষে, সবকিছু একসাথে সংযুক্ত করুন। যদিও কোডটি প্রয়োজনীয় বিজনেস লজিক এমনভাবে প্রয়োগ করে যে দেখে মনে হয় এটি একটি একক প্রসেসে চলছে, Temporal-এর ব্যবহার এটিকে একটি ইভেন্ট-ড্রাইভেন সিস্টেমে (বিশেষত, ইভেন্ট-সোর্সড) পরিণত করে, যেখানে ওয়ার্কফ্লো এবং অ্যাক্টিভিটিগুলোর মধ্যে যোগাযোগ Temporal দ্বারা প্রদত্ত মেসেজিংয়ের মাধ্যমে সম্পন্ন হয়।
টেম্পোরাল ওয়ার্কার টেম্পোরাল সার্ভিসের সাথে সংযুক্ত হয় এবং ওয়ার্কফ্লো ও অ্যাক্টিভিটি টাস্কগুলোর জন্য শিডিউলার হিসেবে কাজ করে। ওয়ার্কারটি ওয়ার্কফ্লো এবং উভয় অ্যাক্টিভিটি রেজিস্টার করার পর টাস্কের জন্য লিসেনিং শুরু করে।
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
ক্লায়েন্ট স্ক্রিপ্ট
ক্লায়েন্ট স্ক্রিপ্টটি ( start_workflow.py ) তৈরি করুন। এটি একটি কোয়েরি জমা দেয় এবং ফলাফলের জন্য অপেক্ষা করে। লক্ষ্য করুন, এটি এজেন্ট ওয়ার্কারে উল্লেখিত একই টাস্ক কিউ-এর সাথে সংযোগ স্থাপন করে— start_workflow স্ক্রিপ্টটি ব্যবহারকারীর প্রম্পটসহ একটি ওয়ার্কফ্লো টাস্ক সেই টাস্ক কিউ-তে প্রেরণ করে, যা এজেন্টের কার্য সম্পাদন শুরু করে।
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
এজেন্টটি চালান
যদি ইতিমধ্যে না করে থাকেন, তাহলে টেম্পোরাল ডেভেলপমেন্ট সার্ভারটি চালু করুন:
temporal server start-devএকটি নতুন টার্মিনাল উইন্ডোতে, এজেন্ট ওয়ার্কারটি চালু করুন:
python -m durable_agent_workerতৃতীয় টার্মিনাল উইন্ডোতে, আপনার এজেন্টের কাছে একটি কোয়েরি জমা দিন:
python -m start_workflow "are there any weather alerts for where I am?"durable_agent_worker এর টার্মিনালে আউটপুটটি লক্ষ্য করুন, যা এজেন্টিক লুপের প্রতিটি পুনরাবৃত্তিতে সংঘটিত ক্রিয়াকলাপগুলি দেখায়। LLM তার হাতে থাকা বিভিন্ন সরঞ্জাম ব্যবহার করে ব্যবহারকারীর অনুরোধ পূরণ করতে সক্ষম। সম্পাদিত ধাপগুলি আপনি http://localhost:8233/namespaces/default/workflows -এ অবস্থিত Temporal UI-এর মাধ্যমে দেখতে পারেন।
এজেন্টের কারণ দেখতে এবং টুলগুলোকে কল করতে কয়েকটি ভিন্ন প্রম্পট ব্যবহার করে দেখুন:
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
শেষ প্রম্পটটির জন্য কোনো টুলের প্রয়োজন হয় না, তাই এজেন্টটি SYSTEM_INSTRUCTIONS উপর ভিত্তি করে একটি হাইকুতে উত্তর দেয়।
স্থায়িত্ব পরীক্ষা করুন (ঐচ্ছিক)
টেম্পোরাল-এর উপর ভিত্তি করে তৈরি করা হলে আপনার এজেন্ট ব্যর্থতা সত্ত্বেও নির্বিঘ্নে টিকে থাকতে পারে। আপনি দুটি স্বতন্ত্র পরীক্ষার মাধ্যমে এটি যাচাই করতে পারেন।
নেটওয়ার্ক বিভ্রাট অনুকরণ করা
এই পরীক্ষায়, আপনি সাময়িকভাবে আপনার কম্পিউটারের ইন্টারনেট সংযোগ বন্ধ করবেন, একটি ওয়ার্কফ্লো জমা দেবেন, দেখবেন টেম্পোরাল স্বয়ংক্রিয়ভাবে পুনরায় চেষ্টা করছে, এবং তারপর নেটওয়ার্কটি পুনরুদ্ধার করে দেখবেন যে এটি ঠিক হয়ে গেছে।
- আপনার মেশিনটিকে ইন্টারনেট থেকে সংযোগ বিচ্ছিন্ন করুন (উদাহরণস্বরূপ, আপনার ওয়াই-ফাই বন্ধ করুন)।
একটি ওয়ার্কফ্লো জমা দিন:
python -m start_workflow "tell me a joke"টেম্পোরাল UI (
http://localhost:8233) চেক করুন। আপনি দেখতে পাবেন যে LLM অ্যাক্টিভিটি ব্যর্থ হচ্ছে এবং টেম্পোরাল স্বয়ংক্রিয়ভাবে ব্যাকগ্রাউন্ডে রিট্রাইগুলো পরিচালনা করছে।ইন্টারনেটে পুনরায় সংযোগ করুন।
পরবর্তী স্বয়ংক্রিয় পুনঃপ্রচেষ্টাটি সফলভাবে জেমিনি এপিআই-তে পৌঁছাবে এবং আপনার টার্মিনালে চূড়ান্ত ফলাফলটি প্রিন্ট হবে।
শ্রমিক দুর্ঘটনা থেকে বেঁচে যাওয়া
এই পরীক্ষায়, আপনি ওয়ার্কারটিকে চলমান অবস্থায় বন্ধ করে আবার চালু করেন। টেম্পোরাল ওয়ার্কফ্লো হিস্ট্রি (ইভেন্ট সোর্সিং) পুনরায় চালায় এবং সর্বশেষ সম্পন্ন হওয়া অ্যাক্টিভিটি থেকে আবার শুরু করে—ইতিমধ্যে সম্পন্ন হওয়া এলএলএম ইনভোকেশন এবং টুল কলগুলোর পুনরাবৃত্তি হয় না।
- ওয়ার্কারটিকে কিল করার জন্য সময় পেতে,
durable_agent_worker.pyফাইলটি খুলুন এবংAgentWorkflowrunলুপের ভিতরেawait asyncio.sleep(10)সাময়িকভাবে আনকমেন্ট করুন। কর্মীকে পুনরায় চালু করুন:
python -m durable_agent_workerএমন একটি কোয়েরি জমা দিন যা একাধিক টুল সক্রিয় করে:
python -m start_workflow "are there any weather alerts where I am?"ওয়ার্কার প্রসেসটি সম্পূর্ণ হওয়ার আগে যেকোনো সময় বন্ধ করে দিন (ওয়ার্কার টার্মিনালে
Ctrl-C, অথবা ব্যাকগ্রাউন্ডে চললেkill %1ব্যবহার করুন)।কর্মীকে পুনরায় চালু করুন:
python -m durable_agent_worker
টেম্পোরাল ওয়ার্কফ্লোর ইতিহাস পুনরায় চালায়। যে LLM কল এবং টুল ইনভোকেশনগুলো ইতিমধ্যে সম্পন্ন হয়েছে, সেগুলো পুনরায় চালানো হয় না —ইতিহাস (ইভেন্ট লগ) থেকে সেগুলোর ফলাফল তাৎক্ষণিকভাবে পুনরায় চালানো হয়। ওয়ার্কফ্লোটি সফলভাবে শেষ হয়।