Live API 支持与 Gemini 进行低延迟、实时的语音和视频交互。它能够处理连续的音频、视频或文本流,以提供即时、自然逼真的语音回答,从而为用户创造自然的对话体验。

Live API 提供了一套全面的功能,例如语音活动检测、工具使用和函数调用、会话管理(用于管理长时间运行的对话)和临时令牌(用于安全的客户端身份验证)。
本页面将通过示例和基本代码示例帮助您快速入门。
在 Google AI Studio 中试用 Live API
选择一种实现方法
与 Live API 集成时,您需要选择以下实现方法之一:
- 服务器到服务器:您的后端使用 WebSockets 连接到 Live API。通常,您的客户端会将流数据(音频、视频、文本)发送到您的服务器,然后您的服务器会将这些数据转发到 Live API。
- 客户端到服务器:您的前端代码使用 WebSockets 直接连接到 Live API 以流式传输数据,从而绕过后端。
合作伙伴集成
为了简化实时音频和视频应用的开发,您可以使用通过 WebRTC 或 WebSockets 支持 Gemini Live API 的第三方集成。
开始使用
此服务器端示例从麦克风流式传输音频并播放返回的音频。如需查看包含客户端应用的完整端到端示例,请参阅示例应用。
输入音频格式应为 16 位 PCM、16kHz、单声道格式,而接收到的音频使用 24kHz 的采样率。
Python
安装音频流式传输的辅助程序。可能需要其他系统级依赖项(例如 portaudio)。如需了解详细安装步骤,请参阅 PyAudio 文档。
pip install pyaudioimport asyncio
from google import genai
import pyaudio
client = genai.Client()
# --- pyaudio config ---
FORMAT = pyaudio.paInt16
CHANNELS = 1
SEND_SAMPLE_RATE = 16000
RECEIVE_SAMPLE_RATE = 24000
CHUNK_SIZE = 1024
pya = pyaudio.PyAudio()
# --- Live API config ---
MODEL = "gemini-2.5-flash-native-audio-preview-09-2025"
CONFIG = {
"response_modalities": ["AUDIO"],
"system_instruction": "You are a helpful and friendly AI assistant.",
}
audio_queue_output = asyncio.Queue()
audio_queue_mic = asyncio.Queue(maxsize=5)
audio_stream = None
async def listen_audio():
"""Listens for audio and puts it into the mic audio queue."""
global audio_stream
mic_info = pya.get_default_input_device_info()
audio_stream = await asyncio.to_thread(
pya.open,
format=FORMAT,
channels=CHANNELS,
rate=SEND_SAMPLE_RATE,
input=True,
input_device_index=mic_info["index"],
frames_per_buffer=CHUNK_SIZE,
)
kwargs = {"exception_on_overflow": False} if __debug__ else {}
while True:
data = await asyncio.to_thread(audio_stream.read, CHUNK_SIZE, **kwargs)
await audio_queue_mic.put({"data": data, "mime_type": "audio/pcm"})
async def send_realtime(session):
"""Sends audio from the mic audio queue to the GenAI session."""
while True:
msg = await audio_queue_mic.get()
await session.send_realtime_input(audio=msg)
async def receive_audio(session):
"""Receives responses from GenAI and puts audio data into the speaker audio queue."""
while True:
turn = session.receive()
async for response in turn:
if (response.server_content and response.server_content.model_turn):
for part in response.server_content.model_turn.parts:
if part.inline_data and isinstance(part.inline_data.data, bytes):
audio_queue_output.put_nowait(part.inline_data.data)
# Empty the queue on interruption to stop playback
while not audio_queue_output.empty():
audio_queue_output.get_nowait()
async def play_audio():
"""Plays audio from the speaker audio queue."""
stream = await asyncio.to_thread(
pya.open,
format=FORMAT,
channels=CHANNELS,
rate=RECEIVE_SAMPLE_RATE,
output=True,
)
while True:
bytestream = await audio_queue_output.get()
await asyncio.to_thread(stream.write, bytestream)
async def run():
"""Main function to run the audio loop."""
try:
async with client.aio.live.connect(
model=MODEL, config=CONFIG
) as live_session:
print("Connected to Gemini. Start speaking!")
async with asyncio.TaskGroup() as tg:
tg.create_task(send_realtime(live_session))
tg.create_task(listen_audio())
tg.create_task(receive_audio(live_session))
tg.create_task(play_audio())
except asyncio.CancelledError:
pass
finally:
if audio_stream:
audio_stream.close()
pya.terminate()
print("\nConnection closed.")
if __name__ == "__main__":
try:
asyncio.run(run())
except KeyboardInterrupt:
print("Interrupted by user.")
JavaScript
安装音频流式传输的辅助程序。可能需要其他系统级依赖项(sox 适用于 Mac/Windows,ALSA 适用于 Linux)。如需了解详细的安装步骤,请参阅扬声器和麦克风文档。
npm install mic speakerimport { GoogleGenAI, Modality } from '@google/genai';
import mic from 'mic';
import Speaker from 'speaker';
const ai = new GoogleGenAI({});
// WARNING: Do not use API keys in client-side (browser based) applications
// Consider using Ephemeral Tokens instead
// More information at: https://ai.google.dev/gemini-api/docs/ephemeral-tokens
// --- Live API config ---
const model = 'gemini-2.5-flash-native-audio-preview-09-2025';
const config = {
responseModalities: [Modality.AUDIO],
systemInstruction: "You are a helpful and friendly AI assistant.",
};
async function live() {
const responseQueue = [];
const audioQueue = [];
let speaker;
async function waitMessage() {
while (responseQueue.length === 0) {
await new Promise((resolve) => setImmediate(resolve));
}
return responseQueue.shift();
}
function createSpeaker() {
if (speaker) {
process.stdin.unpipe(speaker);
speaker.end();
}
speaker = new Speaker({
channels: 1,
bitDepth: 16,
sampleRate: 24000,
});
speaker.on('error', (err) => console.error('Speaker error:', err));
process.stdin.pipe(speaker);
}
async function messageLoop() {
// Puts incoming messages in the audio queue.
while (true) {
const message = await waitMessage();
if (message.serverContent && message.serverContent.interrupted) {
// Empty the queue on interruption to stop playback
audioQueue.length = 0;
continue;
}
if (message.serverContent && message.serverContent.modelTurn && message.serverContent.modelTurn.parts) {
for (const part of message.serverContent.modelTurn.parts) {
if (part.inlineData && part.inlineData.data) {
audioQueue.push(Buffer.from(part.inlineData.data, 'base64'));
}
}
}
}
}
async function playbackLoop() {
// Plays audio from the audio queue.
while (true) {
if (audioQueue.length === 0) {
if (speaker) {
// Destroy speaker if no more audio to avoid warnings from speaker library
process.stdin.unpipe(speaker);
speaker.end();
speaker = null;
}
await new Promise((resolve) => setImmediate(resolve));
} else {
if (!speaker) createSpeaker();
const chunk = audioQueue.shift();
await new Promise((resolve) => {
speaker.write(chunk, () => resolve());
});
}
}
}
// Start loops
messageLoop();
playbackLoop();
// Connect to Gemini Live API
const session = await ai.live.connect({
model: model,
config: config,
callbacks: {
onopen: () => console.log('Connected to Gemini Live API'),
onmessage: (message) => responseQueue.push(message),
onerror: (e) => console.error('Error:', e.message),
onclose: (e) => console.log('Closed:', e.reason),
},
});
// Setup Microphone for input
const micInstance = mic({
rate: '16000',
bitwidth: '16',
channels: '1',
});
const micInputStream = micInstance.getAudioStream();
micInputStream.on('data', (data) => {
// API expects base64 encoded PCM data
session.sendRealtimeInput({
audio: {
data: data.toString('base64'),
mimeType: "audio/pcm;rate=16000"
}
});
});
micInputStream.on('error', (err) => {
console.error('Microphone error:', err);
});
micInstance.start();
console.log('Microphone started. Speak now...');
}
live().catch(console.error);
示例应用
请查看以下示例应用,了解如何将 Live API 用于端到端用例: