Gemini Live API를 사용하면 Gemini 모델과 실시간 양방향 상호작용이 가능하며, 오디오, 동영상, 텍스트 입력과 기본 오디오 출력을 지원합니다. 이 가이드에서는 원시 WebSocket을 사용하여 API와 직접 통합하는 방법을 설명합니다.
개요
Gemini Live API는 실시간 통신에 WebSocket을 사용합니다. SDK를 사용하는 것과 달리 이 접근 방식에서는 WebSocket 연결을 직접 관리하고 API에서 정의한 특정 JSON 형식으로 메시지를 보내고 받습니다.
주요 개념
- WebSocket 엔드포인트: 연결할 특정 URL입니다.
- 메시지 형식: 모든 통신은
LiveSessionRequest및LiveSessionResponse구조를 준수하는 JSON 메시지를 통해 이루어집니다. - 세션 관리: WebSocket 연결을 유지해야 합니다.
인증
인증은 WebSocket URL에 API 키를 쿼리 매개변수로 포함하여 처리됩니다.
엔드포인트 형식은 다음과 같습니다.
wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent?key=YOUR_API_KEY
YOUR_API_KEY를 실제 API 키로 바꿉니다.
임시 토큰을 사용한 인증
일회성 토큰을 사용하는 경우 v1alpha 엔드포인트에 연결해야 합니다.
임시 토큰은 access_token 쿼리 매개변수로 전달해야 합니다.
임시 키의 엔드포인트 형식은 다음과 같습니다.
wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContentConstrained?access_token={short-lived-token}
{short-lived-token}을 실제 임시 토큰으로 바꿉니다.
라이브 API에 연결
라이브 세션을 시작하려면 인증된 엔드포인트에 WebSocket 연결을 설정하세요.
WebSocket을 통해 전송된 첫 번째 메시지는 config이 포함된 LiveSessionRequest이어야 합니다.
전체 구성 옵션은 Live API - WebSockets API 참조를 확인하세요.
Python
import asyncio
import websockets
import json
API_KEY = "YOUR_API_KEY"
MODEL_NAME = "gemini-2.5-flash-native-audio-preview-12-2025"
WS_URL = f"wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent?key={API_KEY}"
async def connect_and_configure():
async with websockets.connect(WS_URL) as websocket:
print("WebSocket Connected")
# 1. Send the initial configuration
config_message = {
"config": {
"model": f"models/{MODEL_NAME}",
"responseModalities": ["AUDIO"],
"systemInstruction": {
"parts": [{"text": "You are a helpful assistant."}]
}
}
}
await websocket.send(json.dumps(config_message))
print("Configuration sent")
# Keep the session alive for further interactions
await asyncio.sleep(3600) # Example: keep open for an hour
async def main():
await connect_and_configure()
if __name__ == "__main__":
asyncio.run(main())
자바스크립트
const API_KEY = "YOUR_API_KEY";
const MODEL_NAME = "gemini-2.5-flash-native-audio-preview-12-2025";
const WS_URL = `wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent?key=${API_KEY}`;
const websocket = new WebSocket(WS_URL);
websocket.onopen = () => {
console.log('WebSocket Connected');
// 1. Send the initial configuration
const configMessage = {
config: {
model: `models/${MODEL_NAME}`,
responseModalities: ['AUDIO'],
systemInstruction: {
parts: [{ text: 'You are a helpful assistant.' }]
}
}
};
websocket.send(JSON.stringify(configMessage));
console.log('Configuration sent');
};
websocket.onmessage = (event) => {
const response = JSON.parse(event.data);
console.log('Received:', response);
// Handle different types of responses here
};
websocket.onerror = (error) => {
console.error('WebSocket Error:', error);
};
websocket.onclose = () => {
console.log('WebSocket Closed');
};
텍스트 전송 중
텍스트 입력을 전송하려면 텍스트로 채워진 realtimeInput 필드가 있는 LiveSessionRequest를 구성합니다.
Python
# Inside the websocket context
async def send_text(websocket, text):
text_message = {
"realtimeInput": {
"text": text
}
}
await websocket.send(json.dumps(text_message))
print(f"Sent text: {text}")
# Example usage: await send_text(websocket, "Hello, how are you?")
자바스크립트
function sendTextMessage(text) {
if (websocket.readyState === WebSocket.OPEN) {
const textMessage = {
realtimeInput: {
text: text
}
};
websocket.send(JSON.stringify(textMessage));
console.log('Text message sent:', text);
} else {
console.warn('WebSocket not open.');
}
}
// Example usage:
sendTextMessage("Hello, how are you?");
오디오 전송
오디오는 원시 PCM 데이터 (원시 16비트 PCM 오디오, 16kHz, little-endian)로 전송해야 합니다. 오디오 데이터가 포함된 Blob가 포함된 realtimeInput 필드로 LiveSessionRequest를 구성합니다. mimeType이 중요합니다.
Python
# Inside the websocket context
async def send_audio_chunk(websocket, chunk_bytes):
import base64
encoded_data = base64.b64encode(chunk_bytes).decode('utf-8')
audio_message = {
"realtimeInput": {
"audio": {
"data": encoded_data,
"mimeType": "audio/pcm;rate=16000"
}
}
}
await websocket.send(json.dumps(audio_message))
# print("Sent audio chunk") # Avoid excessive logging
# Assuming 'chunk' is your raw PCM audio bytes
# await send_audio_chunk(websocket, chunk)
자바스크립트
// Assuming 'chunk' is a Buffer of raw PCM audio
function sendAudioChunk(chunk) {
if (websocket.readyState === WebSocket.OPEN) {
const audioMessage = {
realtimeInput: {
audio: {
data: chunk.toString('base64'),
mimeType: 'audio/pcm;rate=16000'
}
}
};
websocket.send(JSON.stringify(audioMessage));
// console.log('Sent audio chunk');
}
}
// Example usage: sendAudioChunk(audioBuffer);
클라이언트 기기 (예: 브라우저)에서 오디오를 가져오는 방법의 예는 GitHub의 포괄적인 예시를 참고하세요.
동영상 전송 중
동영상 프레임은 개별 이미지 (예: JPEG 또는 PNG). 오디오와 마찬가지로 Blob와 함께 realtimeInput를 사용하여 올바른 mimeType를 지정합니다.
Python
# Inside the websocket context
async def send_video_frame(websocket, frame_bytes, mime_type="image/jpeg"):
import base64
encoded_data = base64.b64encode(frame_bytes).decode('utf-8')
video_message = {
"realtimeInput": {
"video": {
"data": encoded_data,
"mimeType": mime_type
}
}
}
await websocket.send(json.dumps(video_message))
# print("Sent video frame")
# Assuming 'frame' is your JPEG-encoded image bytes
# await send_video_frame(websocket, frame)
자바스크립트
// Assuming 'frame' is a Buffer of JPEG-encoded image data
function sendVideoFrame(frame, mimeType = 'image/jpeg') {
if (websocket.readyState === WebSocket.OPEN) {
const videoMessage = {
realtimeInput: {
video: {
data: frame.toString('base64'),
mimeType: mimeType
}
}
};
websocket.send(JSON.stringify(videoMessage));
// console.log('Sent video frame');
}
}
// Example usage: sendVideoFrame(jpegBuffer);
클라이언트 기기 (예: 브라우저)에서 동영상을 가져오는 방법의 예는 GitHub의 엔드 투 엔드 예시를 참고하세요.
응답 수신
WebSocket은 LiveSessionResponse 메시지를 다시 보냅니다. 이러한 JSON 메시지를 파싱하고 다양한 유형의 콘텐츠를 처리해야 합니다.
Python
# Inside the websocket context, in a receive loop
async def receive_loop(websocket):
async for message in websocket:
response = json.loads(message)
print("Received:", response)
if "serverContent" in response:
server_content = response["serverContent"]
# Receiving Audio
if "modelTurn" in server_content and "parts" in server_content["modelTurn"]:
for part in server_content["modelTurn"]["parts"]:
if "inlineData" in part:
audio_data_b64 = part["inlineData"]["data"]
# Process or play the base64 encoded audio data
# audio_data = base64.b64decode(audio_data_b64)
print(f"Received audio data (base64 len: {len(audio_data_b64)})")
# Receiving Text Transcriptions
if "inputTranscription" in server_content:
print(f"User: {server_content['inputTranscription']['text']}")
if "outputTranscription" in server_content:
print(f"Gemini: {server_content['outputTranscription']['text']}")
# Handling Tool Calls
if "toolCall" in response:
await handle_tool_call(websocket, response["toolCall"])
# Example usage: await receive_loop(websocket)
응답을 처리하는 방법의 예시는 GitHub의 포괄적인 예시를 참고하세요.
자바스크립트
websocket.onmessage = (event) => {
const response = JSON.parse(event.data);
console.log('Received:', response);
if (response.serverContent) {
const serverContent = response.serverContent;
// Receiving Audio
if (serverContent.modelTurn?.parts) {
for (const part of serverContent.modelTurn.parts) {
if (part.inlineData) {
const audioData = part.inlineData.data; // Base64 encoded string
// Process or play audioData
console.log(`Received audio data (base64 len: ${audioData.length})`);
}
}
}
// Receiving Text Transcriptions
if (serverContent.inputTranscription) {
console.log('User:', serverContent.inputTranscription.text);
}
if (serverContent.outputTranscription) {
console.log('Gemini:', serverContent.outputTranscription.text);
}
}
// Handling Tool Calls
if (response.toolCall) {
handleToolCall(response.toolCall);
}
};
도구 호출 처리
모델이 도구 호출을 요청하면 LiveSessionResponse에 toolCall 필드가 포함됩니다. 함수를 로컬로 실행하고 toolResponse 필드가 있는 LiveSessionRequest를 사용하여 결과를 WebSocket으로 다시 보내야 합니다.
Python
# Placeholder for your tool function
def my_tool_function(args):
print(f"Executing tool with args: {args}")
# Implement your tool logic here
return {"status": "success", "data": "some result"}
async def handle_tool_call(websocket, tool_call):
function_responses = []
for fc in tool_call["functionCalls"]:
# 1. Execute the function locally
try:
result = my_tool_function(fc.get("args", {}))
response_data = {"result": result}
except Exception as e:
print(f"Error executing tool {fc['name']}: {e}")
response_data = {"error": str(e)}
# 2. Prepare the response
function_responses.append({
"name": fc["name"],
"id": fc["id"],
"response": response_data
})
# 3. Send the tool response back to the session
tool_response_message = {
"toolResponse": {
"functionResponses": function_responses
}
}
await websocket.send(json.dumps(tool_response_message))
print("Sent tool response")
# This function is called within the receive_loop when a toolCall is detected.
자바스크립트
// Placeholder for your tool function
function myToolFunction(args) {
console.log(`Executing tool with args:`, args);
// Implement your tool logic here
return { status: 'success', data: 'some result' };
}
function handleToolCall(toolCall) {
const functionResponses = [];
for (const fc of toolCall.functionCalls) {
// 1. Execute the function locally
let result;
try {
result = myToolFunction(fc.args || {});
} catch (e) {
console.error(`Error executing tool ${fc.name}:`, e);
result = { error: e.message };
}
// 2. Prepare the response
functionResponses.push({
name: fc.name,
id: fc.id,
response: { result }
});
}
// 3. Send the tool response back to the session
if (websocket.readyState === WebSocket.OPEN) {
const toolResponseMessage = {
toolResponse: {
functionResponses: functionResponses
}
};
websocket.send(JSON.stringify(toolResponseMessage));
console.log('Sent tool response');
} else {
console.warn('WebSocket not open to send tool response.');
}
}
// This function is called within websocket.onmessage when a toolCall is detected.