В этом руководстве вы узнаете, как создать агентный цикл в стиле ReAct , использующий API Gemini для рассуждений и Temporal для обеспечения надежности. Полный исходный код этого руководства доступен на GitHub .
Агент может запускать инструменты, например, для поиска предупреждений о погоде или определения местоположения IP-адреса, и будет зацикливаться до тех пор, пока не получит достаточно информации для ответа.
Отличием от типичной демонстрации работы агента является надежность . Каждый вызов LLM, каждый вызов инструмента и каждый шаг агентного цикла сохраняются в Temporal. Если процесс завершается с ошибкой, обрывается связь или истекает время ожидания API, Temporal автоматически повторяет попытку и возобновляет работу с последнего завершенного шага. История разговоров не теряется, и вызовы инструментов не повторяются некорректно.
Архитектура
Архитектура состоит из трех частей:
- Рабочий процесс: цикл управления, который координирует логику выполнения.
- Виды деятельности: Отдельные рабочие модули (запросы LLM, запросы на инструменты), которые компания Temporal обеспечивает устойчивое развитие.
- Работник: Процесс, который выполняет рабочие процессы и действия.
В этом примере вы поместите все три части в один файл ( durable_agent_worker.py ). В реальной реализации вы бы разделили их, чтобы обеспечить различные преимущества развертывания и масштабируемости. Код, который отправляет запрос агенту, вы поместите во второй файл ( start_workflow.py ).
Предварительные требования
Для выполнения этого руководства вам потребуется:
- Ключ API Gemini. Вы можете создать его бесплатно в Google AI Studio .
- Версия Python 3.10 или более поздняя.
- Временной интерфейс командной строки для запуска локального сервера разработки.
Настраивать
Прежде чем начать, убедитесь, что у вас локально запущен сервер разработки Temporal :
temporal server start-devДалее установите необходимые зависимости:
pip install temporalio google-genai httpx pydantic python-dotenv Создайте в каталоге вашего проекта файл .env , содержащий ваш API-ключ Gemini. Вы можете получить API-ключ в Google AI Studio .
echo "GOOGLE_API_KEY=your-api-key-here" > .envВыполнение
В оставшейся части этого руководства мы подробно рассмотрим файл durable_agent_worker.py , шаг за шагом создавая агента. Создайте файл и следуйте инструкциям.
Импорт и настройка песочницы
Начнём с импорта, который необходимо определить заранее. Блок workflow.unsafe.imports_passed_through() указывает песочнице рабочих процессов Temporal разрешить прохождение определённых модулей без ограничений. Это необходимо, поскольку несколько библиотек (в частности, httpx , которая является подклассом urllib.request.Request ) используют шаблоны, которые в противном случае были бы заблокированы песочницей.
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
Системные инструкции
Далее, определите личность агента. Инструкции системы указывают модели, как себя вести. Агенту дано указание отвечать хайку, когда инструменты не требуются.
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
Определения инструментов
Теперь определим инструменты, которые может использовать агент. Каждый инструмент представляет собой асинхронную функцию с описательной строкой документации. Инструменты, принимающие параметры, используют модель Pydantic в качестве единственного аргумента. Это рекомендуемая практика для работы с временными данными, которая обеспечивает стабильность сигнатур активности по мере добавления необязательных полей с течением времени.
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
Далее, определите инструменты для определения местоположения по IP-адресу:
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
реестр инструментов
Далее создайте реестр, который сопоставляет имена инструментов с функциями-обработчиками. Функция get_tools() генерирует совместимые с Gemini объекты FunctionDeclaration из вызываемых объектов, используя FunctionDeclaration.from_callable_with_api_option() .
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
Деятельность LLM
Теперь определим действие, которое вызывает API Gemini. Контракт определяют классы данных GeminiChatRequest и GeminiChatResponse .
Вы отключите автоматический вызов функций, чтобы вызов LLM и вызов инструмента обрабатывались как отдельные задачи, что повысит отказоустойчивость вашего агента. Вы также отключите встроенные в SDK повторные попытки ( attempts=1 ), поскольку Temporal обеспечивает надежную обработку повторных попыток.
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
Динамическая активность инструментов
Далее определите действие, которое выполняет инструменты. Для этого используется функция динамических действий Temporal: обработчик инструментов (вызываемая функция) получается из реестра инструментов с помощью функции get_handler . Это позволяет определять разные агенты, просто предоставляя различный набор инструментов и системных инструкций; рабочий процесс, реализующий агентный цикл, не требует изменений.
Данная функция анализирует сигнатуру обработчика, чтобы определить, как передавать аргументы. Если обработчик ожидает модель Pydantic, он обрабатывает вложенный формат вывода, который генерирует Gemini (например, {"request": {"state": "CA"}} вместо плоского {"state": "CA"} ).
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
Рабочий процесс агентного цикла
Теперь у вас есть все необходимые компоненты для завершения создания агента. Класс AgentWorkflow реализует рабочий процесс, содержащий цикл работы агента. Внутри этого цикла LLM вызывается через действие (что делает его устойчивым), проверяется результат, и если LLM выбрал инструмент, он вызывается через dynamic_tool_activity .
В этом простом агенте в стиле ReAct, как только LLM решает не использовать инструмент, цикл считается завершенным, и возвращается окончательный результат LLM.
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3-flash-preview",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
Цикл работы агента полностью отказоустойчив. Если рабочий агент завершит работу после нескольких итераций цикла, Temporal продолжит работу с того места, где остановился, без необходимости повторного вызова уже выполненных инструкций LLM или других инструментов.
Запуск рабочего проекта
Наконец, соедините все компоненты воедино. Хотя код реализует необходимую бизнес-логику таким образом, что создается впечатление работы в рамках одного процесса, использование Temporal превращает его в событийно-ориентированную систему (в частности, систему, основанную на событиях), где связь между рабочим процессом и действиями осуществляется посредством обмена сообщениями, предоставляемого Temporal.
Временной обработчик подключается к службе временных обработчиков и выступает в роли планировщика задач для рабочих процессов и действий. Обработчик регистрирует рабочий процесс и оба вида действий, а затем начинает отслеживать задачи.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
Скрипт клиента
Создайте клиентский скрипт ( start_workflow.py ). Он отправляет запрос и ожидает результата. Обратите внимание, что он подключается к той же очереди задач, что и рабочий процесс агента — скрипт start_workflow отправляет задачу рабочего процесса с запросом пользователя в эту очередь задач, инициируя выполнение агента.
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
Запустите агент
Если вы еще этого не сделали, запустите сервер разработки Temporal:
temporal server start-devВ новом окне терминала запустите рабочий агент:
python -m durable_agent_workerВ третьем окне терминала отправьте запрос своему агенту:
python -m start_workflow "are there any weather alerts for where I am?"Обратите внимание на вывод в терминале durable_agent_worker , который показывает действия, происходящие на каждой итерации агентного цикла. LLM может удовлетворить запрос пользователя, используя ряд имеющихся в его распоряжении инструментов. Вы можете увидеть шаги, которые были выполнены, через временный пользовательский интерфейс по http://localhost:8233/namespaces/default/workflows .
Попробуйте несколько разных вариантов ответа, чтобы узнать причину обращения оператора и воспользоваться инструментами обработки звонков:
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
Для последнего запроса не требуются никакие инструменты, поэтому агент отвечает хайку, основанным на SYSTEM_INSTRUCTIONS .
Испытание на прочность (необязательно)
Использование архитектуры Temporal гарантирует, что ваш агент будет бесперебойно работать после сбоев. Вы можете проверить это, проведя два отдельных эксперимента.
Имитация сбоя в сети
В этом тесте вам нужно будет временно отключить интернет-соединение на вашем компьютере, запустить рабочий процесс, понаблюдать за автоматической повторной попыткой подключения со стороны Temporal, а затем восстановить сетевое соединение, чтобы увидеть, как оно восстановится.
- Отключите компьютер от интернета (например, выключите Wi-Fi).
Отправьте рабочий процесс:
python -m start_workflow "tell me a joke"Проверьте пользовательский интерфейс Temporal (
http://localhost:8233). Вы увидите, что активность LLM завершается с ошибкой, и Temporal автоматически обрабатывает повторные попытки в фоновом режиме.Восстановите подключение к интернету.
Следующая автоматическая попытка подключения успешно завершится при попытке связаться с API Gemini, и ваш терминал выведет окончательный результат.
Выжить после аварии на рабочем месте
В этом тесте вы прерываете выполнение рабочего процесса и перезапускаете его. Temporal воспроизводит историю рабочего процесса (поиск событий) и возобновляет работу с последнего завершенного действия — уже завершенные вызовы LLM и вызовы инструментов не повторяются.
- Чтобы у вас было время завершить работу воркера, откройте файл
durable_agent_worker.pyи временно раскомментируйте строкуawait asyncio.sleep(10)внутри циклаrunAgentWorkflow. Перезапустите рабочий процесс:
python -m durable_agent_workerОтправьте запрос, который активирует несколько инструментов:
python -m start_workflow "are there any weather alerts where I am?"Завершить рабочий процесс можно в любой момент до его полного завершения (
Ctrl-Cв терминале рабочего процесса или командаkill %1если процесс запущен в фоновом режиме).Перезапустите рабочий процесс:
python -m durable_agent_worker
Функция Temporal воспроизводит историю рабочего процесса. Вызовы LLM и запуски инструментов, которые уже завершились, не выполняются повторно — их результаты мгновенно воспроизводятся из истории (журнала событий). Рабочий процесс завершается успешно.