Part 3: Event handling with run_live()¶
The run_live() method is ADK's primary entry point for streaming conversations, implementing an async generator that yields events as the conversation unfolds. This part focuses on understanding and handling these events—the core communication mechanism that enables real-time interaction between your application, users, and AI models.
You'll learn how to process different event types (text, audio, transcriptions, tool calls), manage conversation flow with interruption and turn completion signals, serialize events for network transport, and leverage ADK's automatic tool execution. Understanding event handling is essential for building responsive streaming applications that feel natural and real-time to users.
Async Context Required
All run_live() code requires async context. See Part 1: FastAPI Application Example for details and production examples.
How run_live() Works¶
run_live() is an async generator that streams conversation events in real-time. It yields events immediately as they're generated—no buffering, no polling, no callbacks. Events are streamed without internal buffering. Overall memory depends on session persistence (e.g., in-memory vs database), making it suitable for both quick exchanges and extended sessions.
Method Signature and Flow¶
Usage:
# The method signature reveals the thoughtful design
async def run_live(
self,
*, # Keyword-only arguments
user_id: Optional[str] = None, # User identification (required unless session provided)
session_id: Optional[str] = None, # Session tracking (required unless session provided)
live_request_queue: LiveRequestQueue, # The bidirectional communication channel
run_config: Optional[RunConfig] = None, # Streaming behavior configuration
session: Optional[Session] = None, # Deprecated: use user_id and session_id instead
) -> AsyncGenerator[Event, None]: # Generator yielding conversation events
As its signature tells, every streaming conversation needs identity (user_id), continuity (session_id), communication (live_request_queue), and configuration (run_config). The return type—an async generator of Events—promises real-time delivery without overwhelming system resources.
sequenceDiagram
participant Client
participant Runner
participant Agent
participant LLMFlow
participant Gemini
Client->>Runner: runner.run_live(user_id, session_id, queue, config)
Runner->>Agent: agent.run_live(context)
Agent->>LLMFlow: _llm_flow.run_live(context)
LLMFlow->>Gemini: Connect and stream
loop Continuous Streaming
Gemini-->>LLMFlow: LlmResponse
LLMFlow-->>Agent: Event
Agent-->>Runner: Event
Runner-->>Client: Event (yield)
end
Basic Usage Pattern¶
The simplest way to consume events from run_live() is to iterate over the async generator with a for-loop:
async for event in runner.run_live(
user_id=user_id,
session_id=session_id,
live_request_queue=live_request_queue,
run_config=run_config
):
event_json = event.model_dump_json(exclude_none=True, by_alias=True)
logger.debug(f"[SERVER] Event: {event_json}")
await websocket.send_text(event_json)
Session Identifiers
Both user_id and session_id must match the identifiers you used when creating the session via SessionService.create_session(). These can be any string values based on your application's needs (e.g., UUIDs, email addresses, custom tokens). See Part 1: Get or Create Session for detailed guidance on session identifiers.
Connection Lifecycle in run_live()¶
The run_live() method manages the underlying Live API connection lifecycle automatically:
Connection States:
- Initialization: Connection established when
run_live()is called - Active Streaming: Bidirectional communication via
LiveRequestQueue(upstream to the model) andrun_live()(downstream from the model) - Graceful Closure: Connection closes when
LiveRequestQueue.close()is called - Error Recovery: ADK supports transparent session resumption; enable via
RunConfig.session_resumptionto handle transient failures. See Part 4: Live API Session Resumption for details.
What run_live() Yields¶
The run_live() method yields a stream of Event objects in real-time as the agent processes user input and generates responses. Understanding the different event types helps you build responsive UIs that handle text, audio, transcriptions, tool calls, metadata, and errors appropriately. Each event type is explained in detail in the sections below.
| Event Type | Description |
|---|---|
| Text Events | Model's text responses when using response_modalities=["TEXT"]; includes partial, turn_complete, and interrupted flags for streaming UI management |
| Audio Events with Inline Data | Raw audio bytes (inline_data) streamed in real-time when using response_modalities=["AUDIO"]; ephemeral (not persisted to session) |
| Audio Events with File Data | Audio aggregated into files and stored in artifacts; contains file_data references instead of raw bytes; can be persisted to session history |
| Metadata Events | Token usage information (prompt_token_count, candidates_token_count, total_token_count) for cost monitoring and quota tracking |
| Transcription Events | Speech-to-text for user input (input_transcription) and model output (output_transcription) when transcription is enabled in RunConfig |
| Tool Call Events | Function call requests from the model; ADK handles execution automatically |
| Error Events | Model errors and connection issues with error_code and error_message fields |
Source Reference
See the complete event type handling implementation in runners.py:746-775
When run_live() Exits¶
The run_live() event loop can exit under various conditions. Understanding these exit scenarios is crucial for proper resource cleanup and error handling:
| Exit Condition | Trigger | Graceful? | Description |
|---|---|---|---|
| Manual close | live_request_queue.close() |
✅ Yes | User explicitly closes the queue, sending LiveRequest(close=True) signal |
| All agents complete | Last agent in SequentialAgent calls task_completed() |
✅ Yes | After all sequential agents finish their tasks |
| Session timeout | Live API duration limit reached | ⚠️ Connection closed | Session exceeds maximum duration (see limits below) |
| Early exit | end_invocation flag set |
✅ Yes | Set during preprocessing or by tools/callbacks to terminate early |
| Empty event | Queue closure signal | ✅ Yes | Internal signal indicating event stream has ended |
| Errors | Connection errors, exceptions | ❌ No | Unhandled exceptions or connection failures |
SequentialAgent Behavior
When using SequentialAgent, the task_completed() function does NOT exit your application's run_live() loop. It only signals the end of the current agent's work, triggering a seamless transition to the next agent in the sequence. Your event loop continues receiving events from subsequent agents. The loop only exits when the last agent in the sequence completes.
Learn More
For session resumption and connection recovery details, see Part 4: Live API Session Resumption. For multi-agent workflows, see Best Practices for Multi-Agent Workflows.
Events Saved to ADK Session¶
Not all events yielded by run_live() are persisted to the ADK Session. When run_live() exits, only certain events are saved to the session while others remain ephemeral. Understanding which events are saved versus which are ephemeral is crucial for applications that use session persistence, resumption, or need to review conversation history.
Source Reference
See session event persistence logic in runners.py:746-775
Events Saved to the ADK Session:
These events are persisted to the ADK Session and available in session history:
- Audio Events with File Data: Saved to ADK
Sessiononly ifRunConfig.save_live_model_audio_to_sessionisTrue; audio data is aggregated into files in artifacts withfile_datareferences - Usage Metadata Events: Always saved to track token consumption across the ADK
Session - Non-Partial Transcription Events: Final transcriptions are saved; partial transcriptions are not persisted
- Function Call and Response Events: Always saved to maintain tool execution history
- Other Control Events: Most control events (e.g.,
turn_complete,finish_reason) are saved
Events NOT Saved to the ADK Session:
These events are ephemeral and only yielded to callers during active streaming:
- Audio Events with Inline Data: Raw audio
Blobdata ininline_datais never saved to the ADKSession(only yielded for real-time playback) - Partial Transcription Events: Only yielded for real-time display; final transcriptions are saved
Audio Persistence
To save audio conversations to the ADK Session for review or resumption, enable RunConfig.save_live_blob = True. This persists audio streams to artifacts. See Part 4: save_live_blob for configuration details.
Understanding Events¶
Events are the core communication mechanism in ADK's Bidi-streaming system. This section explores the complete lifecycle of events—from how they're generated through multiple pipeline layers, to concurrent processing patterns that enable true real-time interaction, to practical handling of interruptions and turn completion. You'll learn about event types (text, audio, transcriptions, tool calls), serialization strategies for network transport, and the connection lifecycle that manages streaming sessions across both Gemini Live API and Vertex AI Live API platforms.
The Event Class¶
ADK's Event class is a Pydantic model that represents all communication in a streaming conversation. It extends LlmResponse and serves as the unified container for model responses, user input, transcriptions, and control signals.
Source Reference
See Event class implementation in event.py:30-129 and llm_response.py:28-185
Key Fields¶
Essential for all applications:
content: Contains text, audio, or function calls asContent.partsauthor: Identifies who created the event ("user"or agent name)partial: Distinguishes incremental chunks from complete textturn_complete: Signals when to enable user input againinterrupted: Indicates when to stop rendering current output
For voice/audio applications:
input_transcription: User's spoken words (when enabled inRunConfig)output_transcription: Model's spoken words (when enabled inRunConfig)content.parts[].inline_data: Audio data for playback
For tool execution:
content.parts[].function_call: Model's tool invocation requestscontent.parts[].function_response: Tool execution resultslong_running_tool_ids: Track async tool execution
For debugging and diagnostics:
usage_metadata: Token counts and billing informationcache_metadata: Context cache hit/miss statisticsfinish_reason: Why the model stopped generating (e.g., STOP, MAX_TOKENS, SAFETY)error_code/error_message: Failure diagnostics
Author Semantics
Transcription events have author "user"; model responses/events use the agent's name as author (not "model"). See Event Authorship for details.
Understanding Event Identity¶
Events have two important ID fields:
event.id: Unique identifier for this specific event (format: UUID). Each event gets a new ID, even partial text chunks.event.invocation_id: Shared identifier for all events in the current invocation (format:"e-" + UUID). Inrun_live(), all events from a single streaming session share the same invocation_id. (See InvocationContext for more about invocations)
Usage:
# All events in this streaming session will have the same invocation_id
async for event in runner.run_live(...):
print(f"Event ID: {event.id}") # Unique per event
print(f"Invocation ID: {event.invocation_id}") # Same for all events in session
Use cases: - event.id: Track individual events in logs, deduplicate events - event.invocation_id: Group events by conversation session, filter session-specific events
Event Authorship¶
In live streaming mode, the Event.author field follows special semantics to maintain conversation clarity:
Model responses: Authored by the agent name (e.g., "my_agent"), not the literal string "model"
- This enables multi-agent scenarios where you need to track which agent generated the response
- Example:
Event(author="customer_service_agent", content=...)
User transcriptions: Authored as "user" when the event contains transcribed user audio
How it works:
- Gemini Live API returns user audio transcriptions with
content.role == 'user' - ADK's
get_author_for_event()function checks for this role marker - If
content.role == 'user', ADK setsEvent.authorto"user" - Otherwise, ADK sets
Event.authorto the agent name (e.g.,"my_agent")
This transformation ensures that transcribed user input is correctly attributed to the user in your application's conversation history, even though it flows through the model's response stream.
- Example: Input audio transcription →
Event(author="user", input_transcription=..., content.role="user")
Why this matters:
- In multi-agent applications, you can filter events by agent:
events = [e for e in stream if e.author == "my_agent"] - When displaying conversation history, use
event.authorto show who said what - Transcription events are correctly attributed to the user even though they flow through the model
Source Reference
See author attribution logic in base_llm_flow.py:281-294
Event Types and Handling¶
ADK streams distinct event types through runner.run_live() to support different interaction modalities: text responses for traditional chat, audio chunks for voice output, transcriptions for accessibility and logging, and tool call notifications for function execution. Each event includes metadata flags (partial, turn_complete, interrupted) that control UI state transitions and enable natural, human-like conversation flows. Understanding how to recognize and handle these event types is essential for building responsive streaming applications.
Text Events¶
The most common event type, containing the model's text responses when you specifying response_modalities in RunConfig to ["TEXT"] mode:
Usage:
async for event in runner.run_live(...):
if event.content and event.content.parts:
if event.content.parts[0].text:
text = event.content.parts[0].text
if not event.partial:
# Your logic to update streaming display
update_streaming_display(text)
Default Response Modality Behavior¶
When response_modalities is not explicitly set (i.e., None), ADK automatically defaults to ["AUDIO"] mode at the start of run_live(). This means:
- If you provide no RunConfig: Defaults to
["AUDIO"] - If you provide RunConfig without response_modalities: Defaults to
["AUDIO"] - If you explicitly set response_modalities: Uses your setting (no default applied)
Why this default exists: Some native audio models require the response modality to be explicitly set. To ensure compatibility with all models, ADK defaults to ["AUDIO"].
For text-only applications: Always explicitly set response_modalities=["TEXT"] in your RunConfig to avoid receiving unexpected audio events.
Example:
# Explicit text mode
run_config = RunConfig(
response_modalities=["TEXT"],
streaming_mode=StreamingMode.BIDI
)
Key Event Flags:
These flags help you manage streaming text display and conversation flow in your UI:
event.partial:Truefor incremental text chunks during streaming;Falsefor complete merged textevent.turn_complete:Truewhen the model has finished its complete responseevent.interrupted:Truewhen user interrupted the model's response
Learn More
For detailed guidance on using partial turn_complete and interrupted flags to manage conversation flow and UI state, see Handling Text Events.
Audio Events¶
When response_modalities is configured to ["AUDIO"] in your RunConfig, the model generates audio output instead of text, and you'll receive audio data in the event stream:
Configuration:
# Configure RunConfig for audio responses
run_config = RunConfig(
response_modalities=["AUDIO"],
streaming_mode=StreamingMode.BIDI
)
# Audio arrives as inline_data in event.content.parts
async for event in runner.run_live(..., run_config=run_config):
if event.content and event.content.parts:
part = event.content.parts[0]
if part.inline_data:
# Audio event structure:
# part.inline_data.data: bytes (raw PCM audio)
# part.inline_data.mime_type: str (e.g., "audio/pcm")
audio_data = part.inline_data.data
mime_type = part.inline_data.mime_type
print(f"Received {len(audio_data)} bytes of {mime_type}")
# Your logic to play audio
await play_audio(audio_data)
Learn More
response_modalitiescontrols how the model generates output—you must choose either["TEXT"]for text responses or["AUDIO"]for audio responses per session. You cannot use both modalities simultaneously. See Part 4: Response Modalities for configuration details.- For comprehensive coverage of audio formats, sending/receiving audio, and audio processing flow, see Part 5: How to Use Audio, Image and Video.
Audio Events with File Data¶
When audio data is aggregated and saved as files in artifacts, ADK yields events containing file_data references instead of raw inline_data. This is useful for persisting audio to session history.
Source Reference
See audio file aggregation logic in runners.py:752-754 and audio_cache_manager.py:192-194
Receiving Audio File References:
async for event in runner.run_live(
user_id=user_id,
session_id=session_id,
live_request_queue=queue,
run_config=run_config
):
if event.content and event.content.parts:
for part in event.content.parts:
if part.file_data:
# Audio aggregated into a file saved in artifacts
file_uri = part.file_data.file_uri
mime_type = part.file_data.mime_type
print(f"Audio file saved: {file_uri} ({mime_type})")
# Retrieve audio file from artifact service for playback
File Data vs Inline Data:
- Inline Data (
part.inline_data): Raw audio bytes streamed in real-time; ephemeral and not saved to session - File Data (
part.file_data): Reference to audio file stored in artifacts; can be persisted to session history
Both input and output audio data are aggregated into audio files and saved in the artifact service. The file reference is included in the event as file_data, allowing you to retrieve the audio later.
Session Persistence
To save audio events with file data to session history, enable RunConfig.save_live_model_audio_to_session = True. This allows audio conversations to be reviewed or replayed from persisted sessions.
Metadata Events¶
Usage metadata events contain token usage information for monitoring costs and quota consumption. The run_live() method yields these events separately from content events.
Source Reference
See usage metadata structure in llm_response.py:105-106
Accessing Token Usage:
async for event in runner.run_live(
user_id=user_id,
session_id=session_id,
live_request_queue=queue,
run_config=run_config
):
if event.usage_metadata:
print(f"Prompt tokens: {event.usage_metadata.prompt_token_count}")
print(f"Response tokens: {event.usage_metadata.candidates_token_count}")
print(f"Total tokens: {event.usage_metadata.total_token_count}")
# Track cumulative usage across the session
total_tokens += event.usage_metadata.total_token_count or 0
Available Metadata Fields:
prompt_token_count: Number of tokens in the input (prompt and context)candidates_token_count: Number of tokens in the model's responsetotal_token_count: Sum of prompt and response tokenscached_content_token_count: Number of tokens served from cache (when using context caching)
Cost Monitoring
Usage metadata events allow real-time cost tracking during streaming sessions. You can implement quota limits, display usage to users, or log metrics for billing and analytics.
Transcription Events¶
When transcription is enabled in RunConfig, you receive transcriptions as separate events:
Configuration:
async for event in runner.run_live(...):
# User's spoken words (when input_audio_transcription enabled)
if event.input_transcription:
# Your logic to display user transcription
display_user_transcription(event.input_transcription)
# Model's spoken words (when output_audio_transcription enabled)
if event.output_transcription:
# Your logic to display model transcription
display_model_transcription(event.output_transcription)
These enable accessibility features and conversation logging without separate transcription services.
Learn More
For details on enabling transcription in RunConfig and understanding transcription delivery, see Part 5: Audio Transcription.
Tool Call Events¶
When the model requests tool execution:
Usage:
async for event in runner.run_live(...):
if event.content and event.content.parts:
for part in event.content.parts:
if part.function_call:
# Model is requesting a tool execution
tool_name = part.function_call.name
tool_args = part.function_call.args
# ADK handles execution automatically
ADK processes tool calls automatically—you typically don't need to handle these directly unless implementing custom tool execution logic.
Learn More
For details on how ADK automatically executes tools, handles function responses, and supports long-running and streaming tools, see Automatic Tool Execution in run_live().
Error Events¶
Production applications need robust error handling to gracefully handle model errors and connection issues. ADK surfaces errors through the error_code and error_message fields:
Usage:
import logging
logger = logging.getLogger(__name__)
try:
async for event in runner.run_live(...):
# Handle errors from the model or connection
if event.error_code:
logger.error(f"Model error: {event.error_code} - {event.error_message}")
# Send error notification to client
await websocket.send_json({
"type": "error",
"code": event.error_code,
"message": event.error_message
})
# Decide whether to continue or break based on error severity
if event.error_code in ["SAFETY", "PROHIBITED_CONTENT", "BLOCKLIST"]:
# Content policy violations - usually cannot retry
break # Terminal error - exit loop
elif event.error_code == "MAX_TOKENS":
# Token limit reached - may need to adjust configuration
break
# For other errors, you might continue or implement retry logic
continue # Transient error - keep processing
# Normal event processing only if no error
if event.content and event.content.parts:
# ... handle content
pass
finally:
queue.close() # Always cleanup connection
Note
The above example shows the basic structure for checking error_code and error_message. For production-ready error handling with user notifications, retry logic, and context logging, see the real-world scenarios below.
When to use break vs continue:
The key decision is: Can the model's response continue meaningfully?
Scenario 1: Content Policy Violation (Use break)
You're building a customer support chatbot. A user asks an inappropriate question that triggers a SAFETY filter:
Example:
if event.error_code in ["SAFETY", "PROHIBITED_CONTENT", "BLOCKLIST"]:
# Model has stopped generating - continuation is impossible
await websocket.send_json({
"type": "error",
"message": "I can't help with that request. Please ask something else."
})
break # Exit loop - model won't send more events for this turn
Why break? The model has terminated its response. No more events will come for this turn. Continuing would just waste resources waiting for events that won't arrive.
Scenario 2: Network Hiccup During Streaming (Use continue)
You're building a voice transcription service. Midway through transcribing, there's a brief network glitch:
Example:
if event.error_code == "UNAVAILABLE":
# Temporary network issue
logger.warning(f"Network hiccup: {event.error_message}")
# Don't notify user for brief transient issues that may self-resolve
continue # Keep listening - model may recover and continue
Why continue? This is a transient error. The connection might recover, and the model may continue streaming the transcription. Breaking would prematurely end a potentially recoverable stream.
User Notifications
For brief transient errors (lasting <1 second), don't notify the user—they won't notice the hiccup. But if the error persists or impacts the user experience (e.g., streaming pauses for >3 seconds), notify them gracefully: "Experiencing connection issues, retrying..."
Scenario 3: Token Limit Reached (Use break)
You're generating a long-form article and hit the maximum token limit:
Example:
if event.error_code == "MAX_TOKENS":
# Model has reached output limit
await websocket.send_json({
"type": "complete",
"message": "Response reached maximum length",
"truncated": True
})
break # Model has finished - no more tokens will be generated
Why break? The model has reached its output limit and stopped. Continuing won't yield more tokens.
Scenario 4: Rate Limit with Retry Logic (Use continue with backoff)
You're running a high-traffic application that occasionally hits rate limits:
Example:
retry_count = 0
max_retries = 3
async for event in runner.run_live(...):
if event.error_code == "RESOURCE_EXHAUSTED":
retry_count += 1
if retry_count > max_retries:
logger.error("Max retries exceeded")
break # Give up after multiple failures
# Wait and retry
await asyncio.sleep(2 ** retry_count) # Exponential backoff
continue # Keep listening - rate limit may clear
# Reset counter on successful event
retry_count = 0
Why continue (initially)? Rate limits are often temporary. With exponential backoff, the stream may recover. But after multiple failures, break to avoid infinite waiting.
Decision Framework:
| Error Type | Action | Reason |
|---|---|---|
SAFETY, PROHIBITED_CONTENT |
break |
Model terminated response |
MAX_TOKENS |
break |
Model finished generating |
UNAVAILABLE, DEADLINE_EXCEEDED |
continue |
Transient network/timeout issue |
RESOURCE_EXHAUSTED (rate limit) |
continue with retry logic |
May recover after brief wait |
| Unknown errors | continue (with logging) |
Err on side of caution |
Critical: Always use finally for cleanup
Usage:
try:
async for event in runner.run_live(...):
# ... error handling ...
finally:
queue.close() # Cleanup runs whether you break or finish normally
Whether you break or the loop finishes naturally, finally ensures the connection closes properly.
Error Code Reference:
ADK error codes come from the underlying Gemini API. Here are the most common error codes you'll encounter:
| Error Code | Category | Description | Recommended Action |
|---|---|---|---|
SAFETY |
Content Policy | Content violates safety policies | break - Inform user, log incident |
PROHIBITED_CONTENT |
Content Policy | Content contains prohibited material | break - Show policy violation message |
BLOCKLIST |
Content Policy | Content matches blocklist | break - Alert user, don't retry |
MAX_TOKENS |
Limits | Output reached maximum token limit | break - Truncate gracefully, summarize |
RESOURCE_EXHAUSTED |
Rate Limiting | Quota or rate limit exceeded | continue with backoff - Retry after delay |
UNAVAILABLE |
Transient | Service temporarily unavailable | continue - Retry, may self-resolve |
DEADLINE_EXCEEDED |
Transient | Request timeout exceeded | continue - Consider retry with backoff |
CANCELLED |
Client | Client cancelled the request | break - Clean up resources |
UNKNOWN |
System | Unspecified error occurred | continue with logging - Log for analysis |
For complete error code listings and descriptions, refer to the official documentation:
Official Documentation
- FinishReason (when model stops generating tokens): Google AI for Developers | Vertex AI
- BlockedReason (when prompts are blocked by content filters): Google AI for Developers | Vertex AI
- ADK Implementation:
llm_response.py:160-184
Best practices for error handling:
- Always check for errors first: Process
error_codebefore handling content to avoid processing invalid events - Log errors with context: Include session_id and user_id in error logs for debugging
- Categorize errors: Distinguish between retryable errors (transient failures) and terminal errors (content policy violations)
- Notify users gracefully: Show user-friendly error messages instead of raw error codes
- Implement retry logic: For transient errors, consider automatic retry with exponential backoff
- Monitor error rates: Track error types and frequencies to identify systemic issues
- Handle content policy errors: For
SAFETY,PROHIBITED_CONTENT, andBLOCKLISTerrors, inform users that their content violates policies
Handling Text Events¶
Understanding the partial, interrupted, and turn_complete flags is essential for building responsive streaming UIs. These flags enable you to provide real-time feedback during streaming, handle user interruptions gracefully, and detect conversation boundaries for proper state management.
Handling partial¶
This flag helps you distinguish between incremental text chunks and complete merged text, enabling smooth streaming displays with proper final confirmation.
Usage:
async for event in runner.run_live(...):
if event.content and event.content.parts:
if event.content.parts[0].text:
text = event.content.parts[0].text
if event.partial:
# Your streaming UI update logic here
update_streaming_display(text)
else:
# Your complete message display logic here
display_complete_message(text)
partial Flag Semantics:
partial=True: The text in this event is incremental—it contains ONLY the new text since the last eventpartial=False: The text in this event is complete—it contains the full merged text for this response segment
Note
The partial flag is only meaningful for text content (event.content.parts[].text). For other content types:
- Audio events: Each audio chunk in
inline_datais independent (no merging occurs) - Tool calls: Function calls and responses are always complete (partial doesn't apply)
- Transcriptions: Transcription events are always complete when yielded
Example Stream:
Event 1: partial=True, text="Hello", turn_complete=False
Event 2: partial=True, text=" world", turn_complete=False
Event 3: partial=False, text="Hello world", turn_complete=False
Event 4: partial=False, text="", turn_complete=True # Turn done
Important timing relationships:
partial=Falsecan occur multiple times in a turn (e.g., after each sentence)turn_complete=Trueoccurs once at the very end of the model's complete response, in a separate event- You may receive:
partial=False(sentence 1) →partial=False(sentence 2) →turn_complete=True - The merged text event (
partial=Falsewith content) is always yielded before theturn_complete=Trueevent
Note
ADK internally accumulates all text from partial=True events. When you receive an event with partial=False, the text content equals the sum of all preceding partial=True chunks. This means:
- You can safely ignore all
partial=Trueevents and only processpartial=Falseevents if you don't need streaming display - If you do display
partial=Trueevents, thepartial=Falseevent provides the complete merged text for validation or storage - This accumulation is handled automatically by ADK's
StreamingResponseAggregator—you don't need to manually concatenate partial text chunks
Handling interrupted Flag¶
This enables natural conversation flow by detecting when users interrupt the model mid-response, allowing you to stop rendering outdated content immediately.
When users send new input while the model is still generating a response (common in voice conversations), you'll receive an event with interrupted=True:
Usage:
async for event in runner.run_live(...):
if event.interrupted:
# Your logic to stop displaying partial text and clear typing indicators
stop_streaming_display()
# Your logic to show interruption in UI (optional)
show_user_interruption_indicator()
Example - Interruption Scenario:
Model: "The weather in San Francisco is currently..."
User: [interrupts] "Actually, I meant San Diego"
→ event.interrupted=True received
→ Your app: stop rendering model response, clear UI
→ Model processes new input
Model: "The weather in San Diego is..."
When to use interruption handling:
- Voice conversations: Stop audio playback immediately when user starts speaking
- Clear UI state: Remove typing indicators and partial text displays
- Conversation logging: Mark which responses were interrupted (incomplete)
- User feedback: Show visual indication that interruption was recognized
Handling turn_complete Flag¶
This signals conversation boundaries, allowing you to update UI state (enable input controls, hide indicators) and mark proper turn boundaries in logs and analytics.
When the model finishes its complete response, you'll receive an event with turn_complete=True:
Usage:
async for event in runner.run_live(...):
if event.turn_complete:
# Your logic to update UI to show "ready for input" state
enable_user_input()
# Your logic to hide typing indicator
hide_typing_indicator()
# Your logic to mark conversation boundary in logs
log_turn_boundary()
Event Flag Combinations:
Understanding how turn_complete and interrupted combine helps you handle all conversation states:
| Scenario | turn_complete | interrupted | Your App Should |
|---|---|---|---|
| Normal completion | True | False | Enable input, show "ready" state |
| User interrupted mid-response | False | True | Stop display, clear partial content |
| Interrupted at end | True | True | Same as normal completion (turn is done) |
| Mid-response (partial text) | False | False | Continue displaying streaming text |
Implementation:
async for event in runner.run_live(...):
# Handle streaming text
if event.content and event.content.parts and event.content.parts[0].text:
if event.partial:
# Your logic to show typing indicator and update partial text
update_streaming_text(event.content.parts[0].text)
else:
# Your logic to display complete text chunk
display_text(event.content.parts[0].text)
# Handle interruption
if event.interrupted:
# Your logic to stop audio playback and clear indicators
stop_audio_playback()
clear_streaming_indicators()
# Handle turn completion
if event.turn_complete:
# Your logic to enable user input
show_input_ready_state()
enable_microphone()
Common Use Cases:
- UI state management: Show/hide "ready for input" indicators, typing animations, microphone states
- Audio playback control: Know when to stop rendering audio chunks from the model
- Conversation logging: Mark clear boundaries between turns for history/analytics
- Streaming optimization: Stop buffering when turn is complete
Turn completion and caching: Audio/transcript caches are flushed automatically at specific points during streaming:
- On turn completion (
turn_complete=True): Both user and model audio caches are flushed - On interruption (
interrupted=True): Model audio cache is flushed - On generation completion: Model audio cache is flushed
Serializing Events to JSON¶
ADK Event objects are Pydantic models, which means they come with powerful serialization capabilities. The model_dump_json() method is particularly useful for streaming events over network protocols like WebSockets or Server-Sent Events (SSE).
Using event.model_dump_json()¶
This provides a simple one-liner to convert ADK events into JSON format that can be sent over network protocols like WebSockets or SSE.
The model_dump_json() method serializes an Event object to a JSON string:
async def downstream_task() -> None:
"""Receives Events from run_live() and sends to WebSocket."""
async for event in runner.run_live(
user_id=user_id,
session_id=session_id,
live_request_queue=live_request_queue,
run_config=run_config
):
event_json = event.model_dump_json(exclude_none=True, by_alias=True)
await websocket.send_text(event_json)
What gets serialized:
- Event metadata (author, server_content fields)
- Content (text, audio data, function calls)
- Event flags (partial, turn_complete, interrupted)
- Transcription data (input_transcription, output_transcription)
- Tool execution information
When to use model_dump_json():
- ✅ Streaming events over network (WebSocket, SSE)
- ✅ Logging/persistence to JSON files
- ✅ Debugging and inspection
- ✅ Integration with JSON-based APIs
When NOT to use it:
- ❌ In-memory processing (use event objects directly)
- ❌ High-frequency events where serialization overhead matters
- ❌ When you only need a few fields (extract them directly instead)
Performance Warning
Binary audio data in event.content.parts[].inline_data will be base64-encoded when serialized to JSON, significantly increasing payload size (~133% overhead). For production applications with audio, send binary data separately using WebSocket binary frames or multipart HTTP. See Optimization for Audio Transmission for details.
Serialization options¶
This allows you to reduce payload sizes by excluding unnecessary fields, improving network performance and client processing speed.
Pydantic's model_dump_json() supports several useful parameters:
Usage:
# Exclude None values for smaller payloads (with camelCase field names)
event_json = event.model_dump_json(exclude_none=True, by_alias=True)
# Custom exclusions (e.g., skip large binary audio)
event_json = event.model_dump_json(
exclude={'content': {'parts': {'__all__': {'inline_data'}}}},
by_alias=True
)
# Include only specific fields
event_json = event.model_dump_json(
include={'content', 'author', 'turn_complete', 'interrupted'},
by_alias=True
)
# Pretty-printed JSON (for debugging)
event_json = event.model_dump_json(indent=2, by_alias=True)
The bidi-demo uses exclude_none=True to minimize payload size by omitting fields with None values.
Deserializing on the Client¶
This shows how to parse and handle serialized events on the client side, enabling responsive UI updates based on event properties like turn completion and interruptions.
On the client side (JavaScript/TypeScript), parse the JSON back to objects:
// Handle incoming messages
websocket.onmessage = function (event) {
// Parse the incoming ADK Event
const adkEvent = JSON.parse(event.data);
// Handle turn complete event
if (adkEvent.turnComplete === true) {
// Remove typing indicator from current message
if (currentBubbleElement) {
const textElement = currentBubbleElement.querySelector(".bubble-text");
const typingIndicator = textElement.querySelector(".typing-indicator");
if (typingIndicator) {
typingIndicator.remove();
}
}
currentMessageId = null;
currentBubbleElement = null;
return;
}
// Handle interrupted event
if (adkEvent.interrupted === true) {
// Stop audio playback if it's playing
if (audioPlayerNode) {
audioPlayerNode.port.postMessage({ command: "endOfAudio" });
}
// Keep the partial message but mark it as interrupted
if (currentBubbleElement) {
const textElement = currentBubbleElement.querySelector(".bubble-text");
// Remove typing indicator
const typingIndicator = textElement.querySelector(".typing-indicator");
if (typingIndicator) {
typingIndicator.remove();
}
// Add interrupted marker
currentBubbleElement.classList.add("interrupted");
}
currentMessageId = null;
currentBubbleElement = null;
return;
}
// Handle content events (text or audio)
if (adkEvent.content && adkEvent.content.parts) {
const parts = adkEvent.content.parts;
for (const part of parts) {
// Handle text
if (part.text) {
// Add a new message bubble for a new turn
if (currentMessageId == null) {
currentMessageId = Math.random().toString(36).substring(7);
currentBubbleElement = createMessageBubble(part.text, false, true);
currentBubbleElement.id = currentMessageId;
messagesDiv.appendChild(currentBubbleElement);
} else {
// Update the existing message bubble with accumulated text
const existingText = currentBubbleElement.querySelector(".bubble-text").textContent;
const cleanText = existingText.replace(/\.\.\.$/, '');
updateMessageBubble(currentBubbleElement, cleanText + part.text, true);
}
scrollToBottom();
}
}
}
};
📖 Demo Implementation: See the complete WebSocket message handler in
app.js:297-576
Optimization for Audio Transmission¶
Base64-encoded binary audio in JSON significantly increases payload size. For production applications, use a single WebSocket connection with both binary frames (for audio) and text frames (for metadata):
Usage:
async for event in runner.run_live(...):
# Check for binary audio
has_audio = (
event.content and
event.content.parts and
any(p.inline_data for p in event.content.parts)
)
if has_audio:
# Send audio via binary WebSocket frame
for part in event.content.parts:
if part.inline_data:
await websocket.send_bytes(part.inline_data.data)
# Send metadata only (much smaller)
metadata_json = event.model_dump_json(
exclude={'content': {'parts': {'__all__': {'inline_data'}}}},
by_alias=True
)
await websocket.send_text(metadata_json)
else:
# Text-only events can be sent as JSON
await websocket.send_text(event.model_dump_json(exclude_none=True, by_alias=True))
This approach reduces bandwidth by ~75% for audio-heavy streams while maintaining full event metadata.
Automatic Tool Execution in run_live()¶
Source Reference
See automatic tool execution implementation in functions.py
One of the most powerful features of ADK's run_live() is automatic tool execution. Unlike the raw Gemini Live API, which requires you to manually handle tool calls and responses, ADK abstracts this complexity entirely.
The Challenge with Raw Live API¶
When using the Gemini Live API directly (without ADK), tool use requires manual orchestration:
- Receive function calls from the model
- Execute the tools yourself
- Format function responses correctly
- Send responses back to the model
This creates significant implementation overhead, especially in streaming contexts where you need to handle multiple concurrent tool calls, manage errors, and coordinate with ongoing audio/text streams.
How ADK Simplifies Tool Use¶
With ADK, tool execution becomes declarative. Simply define tools on your Agent:
import os
from google.adk.agents import Agent
from google.adk.tools import google_search
agent = Agent(
name="google_search_agent",
model=os.getenv("DEMO_AGENT_MODEL", "gemini-2.5-flash-native-audio-preview-09-2025"),
tools=[google_search],
instruction="You are a helpful assistant that can search the web."
)
When you call runner.run_live(), ADK automatically:
- Detects when the model returns function calls in streaming responses
- Executes tools in parallel for maximum performance
- Handles before/after tool callbacks for custom logic
- Formats function responses according to Live API requirements
- Sends responses back to the model seamlessly
- Yields both function call and response events to your application
Tool Execution Events¶
When tools execute, you'll receive events through the run_live() async generator:
Usage:
async for event in runner.run_live(...):
# Function call event - model requesting tool execution
if event.get_function_calls():
print(f"Model calling: {event.get_function_calls()[0].name}")
# Function response event - tool execution result
if event.get_function_responses():
print(f"Tool result: {event.get_function_responses()[0].response}")
You don't need to handle the execution yourself—ADK does it automatically. You just observe the events as they flow through the conversation.
Learn More
The bidi-demo sends all events (including function calls and responses) directly to the WebSocket client without server-side filtering. This allows the client to observe tool execution in real-time through the event stream. See the downstream task in main.py:178-191
Long-Running and Streaming Tools¶
ADK supports advanced tool patterns that integrate seamlessly with run_live():
Long-Running Tools: Tools that require human approval or take extended time to complete. Mark them with is_long_running=True. In resumable async flows, ADK can pause after long-running calls. In live flows, streaming continues; long_running_tool_ids indicate pending operations and clients can display appropriate UI.
Streaming Tools: Tools that accept an input_stream parameter with type LiveRequestQueue can send real-time updates back to the model during execution, enabling progressive responses.
How Streaming Tools Work
When you call runner.run_live(), ADK inspects your agent's tools at initialization (lines 828-865 in runners.py) to identify streaming tools by checking parameter type annotations for LiveRequestQueue.
Queue creation and lifecycle:
- Creation: ADK creates an
ActiveStreamingToolwith a dedicatedLiveRequestQueuefor each streaming tool at the start ofrun_live()(before processing any events) - Storage: These queues are stored in
invocation_context.active_streaming_tools[tool_name]for the duration of the invocation - Injection: When the model calls the tool, ADK automatically injects the tool's queue as the
input_streamparameter (lines 238-253 infunction_tool.py) - Usage: The tool can use this queue to send real-time updates back to the model during execution
- Lifecycle: The queues persist for the entire
run_live()invocation (one InvocationContext = onerun_live()call) and are destroyed whenrun_live()exits
Queue distinction:
- Main queue (
live_request_queueparameter): Created by your application, used for client-to-model communication - Tool queues (
active_streaming_tools[tool_name].stream): Created automatically by ADK, used for tool-to-model communication during execution
Both types of queues are LiveRequestQueue instances, but they serve different purposes in the streaming architecture.
This enables tools to provide incremental updates, progress notifications, or partial results during long-running operations.
Code reference: See runners.py:828-865 (tool detection) and function_tool.py:238-253 (parameter injection) for implementation details.
See the Tools Guide for implementation examples.
Key Takeaway¶
The difference between raw Live API tool use and ADK is stark:
| Aspect | Raw Live API | ADK run_live() |
|---|---|---|
| Tool Declaration | Manual schema definition | Automatic from Python functions |
| Tool Execution | Manual handling in app code | Automatic parallel execution |
| Response Formatting | Manual JSON construction | Automatic |
| Error Handling | Manual try/catch and formatting | Automatic capture and reporting |
| Streaming Integration | Manual coordination | Automatic event yielding |
| Developer Experience | Complex, error-prone | Declarative, simple |
This automatic handling is one of the core value propositions of ADK—it transforms the complexity of Live API tool use into a simple, declarative developer experience.
InvocationContext: The Execution State Container¶
Source Reference
See InvocationContext implementation in invocation_context.py
While run_live() returns an AsyncGenerator for consuming events, internally it creates and manages an InvocationContext—ADK's unified state carrier that encapsulates everything needed for a complete conversation invocation. One InvocationContext corresponds to one run_live() loop—it's created when you call run_live() and persists for the entire streaming session.
Think of it as a traveling notebook that accompanies a conversation from start to finish, collecting information, tracking progress, and providing context to every component along the way. It's ADK's runtime implementation of the Context concept, providing the execution-time state and services needed during a live conversation. For a broader overview of context in ADK, see Context in ADK.
What is an Invocation?¶
An invocation represents a complete interaction cycle:
- Starts with user input (text, audio, or control signal)
- May involve one or multiple agent calls
- Ends when a final response is generated or when explicitly terminated
- Is orchestrated by
runner.run_live()orrunner.run_async()
This is distinct from an agent call (execution of a single agent's logic) and a step (a single LLM call plus any resulting tool executions).
The hierarchy looks like this:
┌─────────────────────── invocation ──────────────────────────┐
┌──────────── llm_agent_call_1 ────────────┐ ┌─ agent_call_2 ─┐
┌──── step_1 ────────┐ ┌───── step_2 ──────┐
[call_llm] [call_tool] [call_llm] [transfer]
Who Uses InvocationContext?¶
InvocationContext serves different audiences at different levels:
-
ADK's internal components (primary users): Runner, Agent, LLMFlow, and GeminiLlmConnection all receive, read from, and write to the InvocationContext as it flows through the stack. This shared context enables seamless coordination without tight coupling.
-
Application developers (indirect beneficiaries): You don't typically create or manipulate InvocationContext directly in your application code. Instead, you benefit from the clean, simplified APIs that InvocationContext enables behind the scenes—like the elegant
async for event in runner.run_live()pattern. -
Tool and callback developers (direct access): When you implement custom tools or callbacks, you receive InvocationContext as a parameter. This gives you direct access to conversation state, session services, and control flags (like
end_invocation) to implement sophisticated behaviors.
What InvocationContext Contains¶
When you implement custom tools or callbacks, you receive InvocationContext as a parameter. Here's what's available to you:
Essential Fields for Tool/Callback Developers:
context.invocation_id: Current invocation identifier (unique perrun_live()call)context.session:context.session.events: All events in the session history (across all invocations)context.session.state: Persistent key-value store for session datacontext.session.user_id: User identitycontext.run_config: Current streaming configuration (response modalities, transcription settings, cost limits)context.end_invocation: Set this toTrueto immediately terminate the conversation (useful for error handling or policy enforcement)
Example Use Cases in Tool Development:
# Example: Comprehensive tool implementation showing common InvocationContext patterns
def my_tool(context: InvocationContext, query: str):
# Access user identity
user_id = context.session.user_id
# Check if this is the user's first message
event_count = len(context.session.events)
if event_count == 0:
return "Welcome! This is your first message."
# Access conversation history
recent_events = context.session.events[-5:] # Last 5 events
# Access persistent session state
# Session state persists across invocations (not just this streaming session)
user_preferences = context.session.state.get('user_preferences', {})
# Update session state (will be persisted)
context.session.state['last_query_time'] = datetime.now().isoformat()
# Access services for persistence
if context.artifact_service:
# Store large files/audio
await context.artifact_service.save_artifact(
app_name=context.session.app_name,
user_id=context.session.user_id,
session_id=context.session.id,
filename="result.bin",
artifact=types.Part(inline_data=types.Blob(mime_type="application/octet-stream", data=data)),
)
# Process the query with context
result = process_query(query, context=recent_events, preferences=user_preferences)
# Terminate conversation in specific scenarios
if result.get('error'):
# Processing error - stop conversation
context.end_invocation = True
return result
Understanding InvocationContext is essential for grasping how ADK maintains state, coordinates execution, and enables advanced features like multi-agent workflows and resumability. Even if you never touch it directly, knowing what flows through your application helps you design better agents and debug issues more effectively.
Best Practices for Multi-Agent Workflows¶
ADK's bidirectional streaming supports three agent architectures: single agent (one agent handles the entire conversation), multi-agent with sub-agents (a coordinator agent dynamically routes to specialist agents using transfer_to_agent), and sequential workflow agents (agents execute in a fixed pipeline using task_completed). This section focuses on best practices for sequential workflows, where understanding agent transitions and state sharing is crucial for smooth BIDI communication.
Learn More
For comprehensive coverage of multi-agent patterns, see Workflow Agents as Orchestrators in the ADK documentation.
When building multi-agent systems with ADK, understanding how agents transition and share state during live streaming is crucial for smooth BIDI communication.
SequentialAgent with BIDI Streaming¶
SequentialAgent enables workflow pipelines where agents execute one after another. Each agent completes its task before the next one begins. The challenge with live streaming is determining when an agent has finished processing continuous audio or video input.
Source Reference
See SequentialAgent implementation in sequential_agent.py:119-159
How it works:
ADK automatically adds a task_completed() function to each agent in the sequence. When the model calls this function, it signals completion and triggers the transition to the next agent:
Usage:
# SequentialAgent automatically adds this tool to each sub-agent
def task_completed():
"""
Signals that the agent has successfully completed the user's question
or task.
"""
return 'Task completion signaled.'
Recommended Pattern: Transparent Sequential Flow¶
The key insight is that agent transitions happen transparently within the same run_live() event stream. Your application doesn't need to manage transitions—just consume events uniformly:
Usage:
async def handle_sequential_workflow():
"""Recommended pattern for SequentialAgent with BIDI streaming."""
# 1. Single queue shared across all agents in the sequence
queue = LiveRequestQueue()
# 2. Background task captures user input continuously
async def capture_user_input():
while True:
# Your logic to read audio from microphone
audio_chunk = await microphone.read()
queue.send_realtime(
blob=types.Blob(data=audio_chunk, mime_type="audio/pcm")
)
input_task = asyncio.create_task(capture_user_input())
try:
# 3. Single event loop handles ALL agents seamlessly
async for event in runner.run_live(
user_id="user_123",
session_id="session_456",
live_request_queue=queue,
):
# Events flow seamlessly across agent transitions
current_agent = event.author
# Handle audio and text output
if event.content and event.content.parts:
for part in event.content.parts:
# Check for audio data
if part.inline_data and part.inline_data.mime_type.startswith("audio/"):
# Your logic to play audio
await play_audio(part.inline_data.data)
# Check for text data
if part.text:
await display_text(f"[{current_agent}] {part.text}")
# No special transition handling needed!
finally:
input_task.cancel()
queue.close()
Event Flow During Agent Transitions¶
Here's what your application sees when agents transition:
# Agent 1 (Researcher) completes its work
Event: author="researcher", text="I've gathered all the data."
Event: author="researcher", function_call: task_completed()
Event: author="researcher", function_response: task_completed
# --- Automatic transition (invisible to your code) ---
# Agent 2 (Writer) begins
Event: author="writer", text="Let me write the report based on the research..."
Event: author="writer", text=" The findings show..."
Event: author="writer", function_call: task_completed()
Event: author="writer", function_response: task_completed
# --- Automatic transition ---
# Agent 3 (Reviewer) begins - the last agent in sequence
Event: author="reviewer", text="Let me review the report..."
Event: author="reviewer", text="The report looks good. All done!"
Event: author="reviewer", function_call: task_completed()
Event: author="reviewer", function_response: task_completed
# --- Last agent completed: run_live() exits ---
# Your async for loop ends here
Design Principles¶
1. Single Event Loop¶
Use one event loop for all agents in the sequence:
Usage:
# ✅ CORRECT: One loop handles all agents
async for event in runner.run_live(...):
# Your event handling logic here
await handle_event(event) # Works for Agent1, Agent2, Agent3...
# ❌ INCORRECT: Don't break the loop or create multiple loops
for agent in agents:
async for event in runner.run_live(...): # WRONG!
...
2. Persistent Queue¶
The same LiveRequestQueue serves all agents:
# User input flows to whichever agent is currently active
User speaks → Queue → Agent1 (researcher)
↓
User speaks → Queue → Agent2 (writer)
↓
User speaks → Queue → Agent3 (reviewer)
Don't create new queues per agent:
# ❌ INCORRECT: New queue per agent
for agent in agents:
new_queue = LiveRequestQueue() # WRONG!
# ✅ CORRECT: Single queue for entire workflow
queue = LiveRequestQueue()
async for event in runner.run_live(live_request_queue=queue):
...
3. Agent-Aware UI (Optional)¶
Track which agent is active for better user experience:
Usage:
current_agent_name = None
async for event in runner.run_live(...):
# Detect agent transitions
if event.author and event.author != current_agent_name:
current_agent_name = event.author
# Your logic to update UI indicator
await update_ui_indicator(f"Now: {current_agent_name}")
# Your event handling logic here
await handle_event(event)
4. Transition Notifications¶
Optionally notify users when agents hand off:
Usage:
async for event in runner.run_live(...):
# Detect task completion (transition signal)
if event.content and event.content.parts:
for part in event.content.parts:
if (part.function_response and
part.function_response.name == "task_completed"):
# Your logic to display transition notification
await display_notification(
f"✓ {event.author} completed. Handing off to next agent..."
)
continue
# Your event handling logic here
await handle_event(event)
Key Differences: transfer_to_agent vs task_completed¶
Understanding these two functions helps you choose the right multi-agent pattern:
| Function | Agent Pattern | When run_live() Exits |
Use Case |
|---|---|---|---|
transfer_to_agent |
Coordinator (dynamic routing) | LiveRequestQueue.close() |
Route user to specialist based on intent |
task_completed |
Sequential (pipeline) | LiveRequestQueue.close() or task_completed of the last agent |
Fixed workflow: research → write → review |
transfer_to_agent example:
# Coordinator routes based on user intent
User: "I need help with billing"
Event: author="coordinator", function_call: transfer_to_agent(agent_name="billing")
# Stream continues with billing agent - same run_live() loop
Event: author="billing", text="I can help with your billing question..."
task_completed example:
# Sequential workflow progresses through pipeline
Event: author="researcher", function_call: task_completed()
# Current agent exits, next agent in sequence begins
Event: author="writer", text="Based on the research..."
Best Practices Summary¶
| Practice | Reason |
|---|---|
| Use single event loop | ADK handles transitions internally |
| Keep queue alive across agents | Same queue serves all sequential agents |
Track event.author |
Know which agent is currently responding |
| Don't reset session/context | Conversation state persists across agents |
| Handle events uniformly | All agents produce the same event types |
Let task_completed signal transitions |
Don't manually manage sequential flow |
The SequentialAgent design ensures smooth transitions—your application simply sees a continuous stream of events from different agents in sequence, with automatic handoffs managed by ADK.
Summary¶
In this part, you mastered event handling in ADK's Bidi-streaming architecture. We explored the different event types that agents generate—text responses, audio chunks, transcriptions, tool calls, and control signals—and learned how to process each event type effectively. You now understand how to handle interruptions and turn completion signals for natural conversation flow, serialize events for network transport using Pydantic's model serialization, leverage ADK's automatic tool execution to simplify agent workflows, and access InvocationContext for advanced state management scenarios. With these event handling patterns in place, you're equipped to build responsive streaming applications that provide real-time feedback to users. Next, you'll learn how to configure sophisticated streaming behaviors through RunConfig, including multimodal interactions, session resumption, and cost controls.
← Previous: Part 2 - Sending Messages with LiveRequestQueue | Next: Part 4 - Understanding RunConfig →