Ky tutorial ju udhëzon në ndërtimin e një cikli agjentik në stilin ReAct që përdor Gemini API për arsyetim dhe Temporal për qëndrueshmëri. Kodi burimor i plotë për këtë tutorial është i disponueshëm në GitHub .
Agjenti mund të thërrasë mjete, si kërkimi i alarmeve të motit ose gjeolokacioni i një adrese IP, dhe do të kryejë cikël derisa të ketë informacion të mjaftueshëm për t'u përgjigjur.
Ajo që e bën këtë të ndryshme nga një demo tipike agjentësh është qëndrueshmëria . Çdo thirrje LLM, çdo thirrje mjeti dhe çdo hap i ciklit agjentësh mbahet në fuqi nga Temporal. Nëse procesi rrëzohet, rrjeti ndërpritet ose një API skadon, Temporal automatikisht riprovon dhe rifillon nga hapi i fundit i përfunduar. Nuk humbet asnjë historik bisedash dhe asnjë thirrje mjeti nuk përsëritet gabimisht.
Arkitekturë
Arkitektura përbëhet nga tre pjesë:
- Fluksi i punës: Cikli agjentik që orkestron logjikën e ekzekutimit.
- Aktivitetet: Njësi individuale pune (thirrje LLM, thirrje mjetesh) që Temporal i bën të qëndrueshme.
- Punëtor: Procesi që ekzekuton rrjedhat e punës dhe aktivitetet.
Në këtë shembull, do t'i vendosni të tre këto pjesë në një skedar të vetëm ( durable_agent_worker.py ). Në një implementim në botën reale, do t'i ndani ato për të lejuar avantazhe të ndryshme të vendosjes dhe shkallëzueshmërisë. Do ta vendosni kodin që i jep një kërkesë agjentit në një skedar të dytë ( start_workflow.py ).
Parakushte
Për të përfunduar këtë udhëzues, do t'ju duhet:
- Një çelës API Gemini. Mund të krijoni një falas në Google AI Studio .
- Versioni 3.10 i Python ose më i ri.
- CLI-ja Temporal për ekzekutimin e një serveri zhvillimi lokal.
Konfigurimi
Para se të filloni, sigurohuni që keni një server zhvillimi Temporal që funksionon lokalisht:
temporal server start-devTjetra, instaloni varësitë e kërkuara:
pip install temporalio google-genai httpx pydantic python-dotenv Krijo një skedar .env në direktorinë e projektit tënd me çelësin tënd Gemini API. Mund të marrësh një çelës API nga Google AI Studio .
echo "GOOGLE_API_KEY=your-api-key-here" > .envZbatimi
Pjesa tjetër e këtij tutoriali përshkruan durable_agent_worker.py nga lart poshtë, duke e ndërtuar agjentin pjesë-pjesë. Krijo skedarin dhe ndiqe më tej.
Importet dhe konfigurimi i sandbox-it
Filloni me importet që duhet të përcaktohen paraprakisht. Blloku workflow.unsafe.imports_passed_through() i tregon sandbox-it të rrjedhës së punës të Temporal që të lejojë që module të caktuara të kalojnë pa kufizim. Kjo është e nevojshme sepse disa librari (veçanërisht httpx , e cila nënklasifikohet me urllib.request.Request ) përdorin modele që sandbox-i përndryshe do t'i bllokonte.
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
Udhëzimet e sistemit
Më pas, përcaktoni personalitetin e agjentit. Udhëzimet e sistemit i tregojnë modelit se si të sillet. Këtij agjenti i është dhënë udhëzim të përgjigjet me haiku kur nuk nevojiten mjete.
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
Përkufizimet e mjeteve
Tani përcaktoni mjetet që agjenti mund të përdorë. Çdo mjet është një funksion asinkron me një docstring përshkrues. Mjetet që marrin parametra përdorin një model Pydantic si argumentin e tyre të vetëm. Kjo është një praktikë më e mirë Temporal që i mban nënshkrimet e aktivitetit të qëndrueshme ndërsa shtoni fusha opsionale me kalimin e kohës.
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
Tjetra, përcaktoni mjetet për gjeolokacionin e adresës IP:
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
Regjistri i mjeteve
Më pas, krijoni një regjistër që lidh emrat e mjeteve me funksionet e trajtuesit. Funksioni get_tools() gjeneron objekte FunctionDeclaration të pajtueshme me Gemini nga të thërmueshmet duke përdorur FunctionDeclaration.from_callable_with_api_option() .
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
Aktiviteti i LLM-së
Tani përcaktoni aktivitetin që thërret Gemini API. Klasat e të dhënave GeminiChatRequest dhe GeminiChatResponse përcaktojnë kontratën.
Do të çaktivizoni thirrjen automatike të funksionit në mënyrë që thirrja e LLM dhe thirrja e mjetit të trajtohen si detyra të ndara, duke i sjellë agjentit tuaj më shumë qëndrueshmëri. Gjithashtu do të çaktivizoni ripërpjekjet e integruara të SDK-së ( attempts=1 ) pasi Temporal trajton ripërpjekjet në mënyrë të qëndrueshme.
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
Aktiviteti dinamik i mjetit
Më pas, përcaktoni aktivitetin që ekzekuton mjetet. Kjo përdor veçorinë e aktivitetit dinamik të Temporal: trajtuesi i mjeteve (një i thirrshëm) merret nga regjistri i mjeteve nëpërmjet funksionit get_handler . Kjo lejon që agjentë të ndryshëm të përcaktohen thjesht duke ofruar një grup të ndryshëm mjetesh dhe udhëzimesh të sistemit; rrjedha e punës që zbaton ciklin e agjentëve nuk kërkon ndryshime.
Aktiviteti inspekton nënshkrimin e trajtuesit për të përcaktuar se si të kalojë argumentet. Nëse trajtuesi pret një model Pydantic, ai trajton formatin e daljes së ndërthurur që prodhon Gemini (për shembull, {"request": {"state": "CA"}} në vend të një {"state": "CA"} të sheshtë).
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
Rrjedha e punës së ciklit të agjentëve
Tani i keni të gjitha pjesët për të përfunduar ndërtimin e agjentit. Klasa AgentWorkflow zbaton një rrjedhë pune që përmban ciklin e agjentit. Brenda atij cikli, LLM thirret nëpërmjet aktivitetit (duke e bërë atë të qëndrueshëm), rezultati inspektohet dhe nëse një mjet është zgjedhur nga LLM, ai thirret nëpërmjet dynamic_tool_activity .
Në këtë agjent të thjeshtë të stilit ReAct, pasi LLM zgjedh të mos përdorë një mjet, cikli konsiderohet i përfunduar dhe kthehet rezultati përfundimtar i LLM.
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3-flash-preview",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
Cikli agjent është plotësisht i qëndrueshëm. Nëse punonjësi agjent rrëzohet pas disa përsëritjeve të ciklit, Temporal do të vazhdojë saktësisht aty ku ka mbetur pa pasur nevojë të rithirrë thirrjet e LLM-së ose thirrjet e mjeteve të ekzekutuara tashmë.
Startup i punëtorëve
Së fundmi, lidhni gjithçka së bashku. Ndërsa kodi zbaton logjikën e nevojshme të biznesit në një mënyrë që e bën të duket sikur po funksionon në një proces të vetëm, përdorimi i Temporal e bën atë një sistem të drejtuar nga ngjarjet (konkretisht, të bazuar në ngjarje) ku komunikimi midis rrjedhës së punës dhe aktiviteteve ndodh nëpërmjet mesazheve të ofruara nga Temporal.
Punëtori Temporal lidhet me shërbimin Temporal dhe vepron si një planifikues për rrjedhën e punës dhe detyrat e aktivitetit. Punëtori regjistron rrjedhën e punës dhe të dy aktivitetet, pastaj fillon të dëgjojë për detyra.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
Skripti i klientit
Krijo skriptin e klientit ( start_workflow.py ). Ai paraqet një pyetje dhe pret rezultatin. Vini re se lidhet me të njëjtën radhë detyrash të referuar në agent worker - skripti start_workflow dërgon një detyrë të rrjedhës së punës me kërkesën e përdoruesit në atë radhë detyrash, duke filluar ekzekutimin e agjentit.
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
Ekzekutoni agjentin
Nëse nuk e keni bërë ende, nisni serverin e zhvillimit Temporal:
temporal server start-devNë një dritare të re terminali, nisni agent worker-in:
python -m durable_agent_workerNë një dritare të tretë të terminalit, dërgoni një pyetje te agjenti juaj:
python -m start_workflow "are there any weather alerts for where I am?"Vini re rezultatin në terminal të durable_agent_worker që tregon veprimet që ndodhin në çdo përsëritje të ciklit agentic. LLM është në gjendje të përmbushë kërkesën e përdoruesit duke thirrur një seri mjetesh në dispozicion të tij. Mund të shihni hapat që u ekzekutuan nëpërmjet Temporal UI në http://localhost:8233/namespaces/default/workflows .
Provoni disa kërkesa të ndryshme për të parë arsyen e agjentit dhe mjetet e thirrjes:
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
Kërkesa e fundit nuk kërkon ndonjë mjet, kështu që agjenti përgjigjet në një haiku bazuar në SYSTEM_INSTRUCTIONS .
Testimi i qëndrueshmërisë (Opsionale)
Ndërtimi mbi Temporal siguron që agjenti juaj t'i mbijetojë dështimeve pa probleme. Mund ta testoni këtë duke përdorur dy eksperimente të dallueshme.
Simulimi i një ndërprerjeje të rrjetit
Në këtë test, do ta çaktivizoni përkohësisht lidhjen e internetit të kompjuterit tuaj, do të paraqisni një rrjedhë pune, do të shikoni Temporal të provojë përsëri automatikisht dhe më pas do ta rivendosni rrjetin për ta parë atë të rikuperohet.
- Shkëputeni pajisjen tuaj nga interneti (për shembull, fikni Wi-Fi-në).
Dorëzoni një rrjedhë pune:
python -m start_workflow "tell me a joke"Kontrolloni ndërfaqen e përdoruesit të Temporal (
http://localhost:8233). Do të shihni që aktiviteti i LLM dështon dhe Temporal do të menaxhojë automatikisht ripërpjekjet në sfond.Rilidhu me internetin.
Përpjekja tjetër automatike do të arrijë me sukses në Gemini API dhe terminali juaj do të shtypë rezultatin përfundimtar.
Mbijetesa në një aksident pune
Në këtë test, ju e mbyllni punëtorin në mes të ekzekutimit dhe e ristartoni atë. Temporal riluan historikun e rrjedhës së punës (burimin e ngjarjeve) dhe rifillon nga aktiviteti i fundit i përfunduar - thirrjet e LLM-së dhe thirrjet e mjeteve që janë tashmë të përfunduara nuk përsëriten.
- Për t'i dhënë vetes kohë për të mbyllur punëtorin, hapni
durable_agent_worker.pydhe ç'komenteroni përkohësishtawait asyncio.sleep(10)brenda ciklitruntëAgentWorkflow. Rinisni punëtorin:
python -m durable_agent_workerDërgoni një pyetje që aktivizon disa mjete:
python -m start_workflow "are there any weather alerts where I am?"Mbyll procesin punëtor në çdo kohë para përfundimit (
Ctrl-Cnë terminalin punëtor, ose duke përdorurkill %1nëse është duke u ekzekutuar në sfond).Rinisni punëtorin:
python -m durable_agent_worker
Riluan përsëri historikun e rrjedhës së punës në mënyrë të përkohshme. Thirrjet LLM dhe thirrjet e mjeteve që kanë përfunduar tashmë nuk riekzekutohen - rezultatet e tyre riprodhohen menjëherë nga historiku (regjistri i ngjarjeve). Rrjedha e punës përfundon me sukses.