Skip to content

Dynamic workflows

Supported in ADKPython v2.0.0Alpha

The ADK framework provides a programmatic way to define workflows as a more flexible and powerful alternative to graph-based workflows. Using a graph-based approach provides a convenient way to compose multi-step, static process structures with workflow nodes. However, if the logic path for your workflow is more complex, with iterative loops or complex branching logic, a graph-based approach may not suit your needs, or may become too unwieldy to manage.

Dynamic workflows in ADK allow you to put aside graph-based path structures and use the full power of your chosen programming language to build workflows. With Dynamic workflows, you can create workflows with simple decorators, invoke workflow nodes as functions, and build complex routing logic. Here are some of the benefits of dynamic workflows in ADK:

  • Flexible Control Flow: Define execution order dynamically using loops, conditionals, and recursion which are difficult or impossible to represent in static graphs.
  • Programmatic Experience: Use familiar constructs like while loops and async/await instead of graph-based routing.
  • Automatic Checkpointing: Dynamic workflows track each node execution. Successful sub-nodes are automatically skipped when resuming the workflow, making complex logic durable and resumable by default.
  • Encapsulation: Wrap business logic into parent nodes that internally compose lower-level nodes, keeping the overall workflow graph clean and manageable.

Alpha Release

ADK 2.0 is an Alpha release and may cause breaking changes when used with prior versions of ADK. Do not use ADK 2.0 if you require backwards compatibility, such as in production environments. We encourage you to test this release and we welcome your feedback!

For information on installing ADK 2.0 to test this feature, see Welcome to ADK 2.0.

Get started

The following dynamic workflow code example shows how to define a basic workflow containing a single node with a function:

from google.adk import Workflow
from google.adk import Event
from google.adk import Context
from typing import Any

@node(name="hello_node")
def my_node(node_input: Any):
    return "Hello World"

# define a dynamic workflow node
@node(rerun_on_resume=True)
async def my_workflow(ctx: Context, node_input: str) -> str:
    # run_node executes a node and returns its output
    result = await ctx.run_node(my_node, input_data="hello")
    return result

# Run the workflow
root_agent = Workflow(
    name="root_agent",
    edges=[("START", my_workflow)],
)

This example uses the @node annotation for convenience and to keep the written code as simple as possible. This annotation generates wrappers that allow the code to be run in the context of an ADK dynamic workflow.

Building blocks: nodes and workflows

Nodes and workflows represent the basic building blocks of ADK's dynamic workflows. These classes provide the functionality required to wrap your code so it can be integrated into code-based workflows in ADK.

Nodes and @node

A dynamic workflow in ADK is composed of nodes, which are classes derived from BaseNode. A simple version of a usable workflow node is a FunctionNode, which allows you to wrap code with functionality required to run within a Workflow. For convenience, the ADK framework provides the @node annotation which generates the node wrapper, keeping boilerplate wrapper code to a minimum:

@node(name="hello_node")
def my_function_node(node_input: Any):
    return "Hello World"

The following code snippet shows the equivalent code without the @node annotation:

# base function
def my_function_node(node_input: Any):
    return "Hello World"

# FunctionNode wrapper with options
success_node = FunctionNode(
    my_function_node,
    name="hello",
    rerun_on_resume=True,
)

Creating the node wrapper code yourself can be useful if you are wrapping functions from an external library, need to create multiple nodes from the same function with different configurations, or if you are managing node references in a registry for advanced orchestration.

Workflows

In an ADK dynamic workflow, you use the Workflow class as a primary container for orchestrating nodes. You use a node to define a dynamic workflow with code that manages running nodes and the execution logic (order and paths) for those nodes, as shown in the following code sample:

@node(rerun_on_resume=True)
async def my_workflow(ctx):
    # run_node executes a node and returns its output
    result = await ctx.run_node(my_function_node, input_data="Hello")
    result_formatted = await ctx.run_node(my_formatting_node, input_data=result)
    return result_formatted

# Run the workflow
root_agent = Workflow(
    name="root_agent",
    edges=[("START", my_workflow)],
)

Data handling

When using dynamic workflows with ADK, passing data is simpler than graph-based workflows because, with a workflow, the Context class's run_node() method returns the node's output directly. This eliminates the need to directly handle session state or complex routing outputs for data transfer. The following code example shows how you can pass string data between an agent node and a function node:

from google.adk import Context

@node(rerun_on_resume=True)
async def editorial_workflow(ctx: Context, user_request: str):
    # Agent Node generates output
    raw_draft = await ctx.run_node(draft_agent, user_request)

    # Function Node formats text
    formatted_text = await ctx.run_node(format_function_node, raw_draft)

    return formatted_text

You can also pass specific data schemas using defined class and configure input and output schemas, similar to graph-based workflow nodes, as shown in the following code example:

from google.adk import Agent
from google.adk import Context
from pydantic import BaseModel

class CityTime(BaseModel):
    time_info: str  # time information
    city: str       # city name

@node
def city_time_function(city: str):
    """Simulate returning the current time in a specified city."""
    return CityTime(time_info="10:10 AM", city=city)

city_report_agent = Agent(
    name="city_report_agent",
    model="gemini-2.5-flash",
    input_schema=CityTime,
    instruction="""output the data provided by the previous node.""",
)

@node # workflow node
async def city_workflow(ctx: Context):
    city_time = await ctx.run_node(city_time_function, "Paris")
    report_text = await ctx.run_node(city_report_agent, city_time)

    return report_text

For more information on data handling between workflow nodes, see Data handling for agent workflows.

Workflow routes

Dynamic workflows in ADK provide more flexibility in terms of routing logic compared to graph-based workflows, including iterative loops or more complex branching logic. This section describes some of the techniques that you can use for routing.

Sequence route

You can create sequential task processing with dynamic workflows in ADK, just as you can with graph-based workflows. The following code snippet shows a dynamic workflow with an agent, a function node, and a second agent:

@node # workflow node
async def city_workflow(ctx: Context):
    city = await ctx.run_node(city_generator_agent)
    city_time = await ctx.run_node(city_time_function, city)
    report_text = await ctx.run_node(city_report_agent, city_time)

    return report_text

Loop route

For workflows where you want to use an iterative loop for a task, dynamic workflows offer much more flexibility to define the routing logic you need. The following code example shows how to use dynamic workflows to construct a workflow loop for generating, reviewing, and updating code:

coder_agent = LlmAgent(
    name="generator_agent",
    model="gemini-2.5-flash",
    instruction="Write python code for user request.",
    output_schema=str,
)

@node(name="lint_reviewer")
compile_lint_check = ApiNode()

fixer_agent = LlmAgent(
    name="generator_agent",
    model="gemini-2.5-flash",
    instruction="""Refactor current code {code}.
        Based on compile & lint review: {findings}""",
    output_schema=str,
)

@node # workflow node
async def code_workflow(ctx):
  code = await ctx.run_node(coder_agent)
  check_resp = await ctx.run_node(compile_lint_check, code)

  while check_resp.findings:
    yield Event(state={"code": code, "findings": check_resp.findings})
    code = await ctx.run_node(fixer_agent)

    check_resp = await ctx.run_node(compile_lint_check, code)

  return code

Parallel execution routes

Dynamic workflows in ADK can support parallel execution, and you can use standard asynchronous libraries, such as the asyncio, to build this functionality. The following code example shows how to build a workflow node that supports parallel execution, which can then be integrated into a larger workflow:

from google.adk.workflow import BaseNode
from google.adk import Context
from typing import Any
import asyncio

class ParallelNode(BaseNode):
    """A supervisor node that runs a worker node in parallel."""
    real_node: BaseNode

    async def run(self, ctx: Context, node_input: list[Any]):
        tasks = []

        # Dynamically schedule worker nodes for each item in the input list
        for item in node_input:
            # ctx.run_node returns an awaitable future for the ephemeral node
            tasks.append(ctx.run_node(self.real_node, item))

        # Use asyncio to gather results in parallel
        results = await asyncio.gather(*tasks)

        return results

Tip: Resuming parallel nodes

The workflow framework ensures that if a dynamic workflow is resumed, only failed or interrupted worker nodes are re-executed, including parallel worker nodes.

Human input

Dynamic workflows in ADK can also include human input or human in the loop (HITL) steps. You build human input into workflows by creating a BaseNode subclass that interrupts the workflow, combined with a RequestInput instance for providing a request to the user and retrieving the response. The following code example shows how to build a human input node and include it in a workflow:

from google.adk.workflow import BaseNode
from google.adk import Context
from google.adk.events import RequestInput
from typing import Any, AsyncGenerator

class GetInput(BaseNode):
    """A node that pauses execution and waits for human input."""
    rerun_on_resume = False  # Ensure the response is yielded as output on resume

    def __init__(self, request: RequestInput, name: str):
        self.request = request
        self.name = name

    def get_name(self) -> str:
        return self.name

    async def run(self) -> AsyncGenerator[Any, None]:
        # Yielding the request tells the workflow to pause and wait for input
        yield self.request

async def approval_process_node(ctx: Context, node_input: Any):
    """A parent node that coordinates a human approval step."""

    # Define the request for the user
    request = RequestInput(message="Please approve this request (Yes/No)")

    # Invoke the HITL node dynamically. The workflow pauses here.
    user_response = await ctx.run_node(GetInput(request, name="approval_step"))

    if user_response.lower() == "yes":
        return "Request Approved"
    else:
        return "Request Denied"

Advanced features

Dynamic workflows offer some advanced features designed to handle more complex development scenarios. These capabilities allow for finer control over execution and better integration with existing technical infrastructure.

Execution IDs

The ADK framework generates a deterministic identifier (ID) for child node executions based on the parent ID and a counter. ADK workflows use deterministic IDs for each scheduled node to identify previous results. These IDs are generated based on the order of dynamic node schedules, and are used for checkpointing and to re-run tasks in the correct order in the case of a resumed or re-run workflow.

Custom execution IDs

In some rare cases, you may need to have stable identifiers, such as when processing a reorderable list, you can supply a custom ID when running a node. In general, you should avoid this due to the impacts to workflow task retries and process resumes. Specifically, these IDs are used to check node states and skip execution if a node was already run. If you provide custom IDs, make sure they are deterministic for workflow re-runs and logically remain the same for the input. The following example code shows how to add such an identifier when executing node in a workflow:

Warning: Custom execution IDs

Avoid creating custom execution IDs. Since execution IDs are used to determine the execution order of nodes, custom execution IDs can cause problems when the system attempts to re-run those nodes in your workflow.

class Order(BaseModel):
  order_id: str
  cart_items: list[Product]

def shorten_link(ctx, node_input: str):

  orders = await get_orders()

  process_tasks = []
  for i, order in enumerate(orders):
    task = ctx.run_node(process_order, order, name=order.order_id))

    process_tasks.append(task)

  result = asyncio.gather(*process_tasks)

  yield result