Skip to content

Workflows

Workflows in garf allow you to orchestrate complex data fetching and processing pipelines. Instead of running individual queries, you can define a sequence of steps, where each step interacts with a specific data source (fetcher) and writes to a destination.

Important

Starting from version 1.4.0 there have been added an experimental grf utility which simplifies working with workflows.

Configuration

Workflows are defined in YAML files. The core structure consists of a list of steps, where each step defines what data to fetch and where to save it.

Workflow Step Structure

steps:
  - alias: step_name
    fetcher: source_name
    writer: destination
    writer_parameters:
      key: value
    fetcher_parameters:
      key: value
    query_parameters:
      macro:
        key: value
      template:
        key: value
    queries:
      - folder: path/to/queries/
      - path: path/to/query.sql
      - query:
          text: "SELECT 1"
          title: "simple_query"
    parallel_threshold: 10

Components

  • fetcher: The source of data. Check available fetchers.
  • fetcher_parameters: Key value pairs used to fine-tune fetching process.
  • alias: A unique identifier for the step. Useful for logging and selective execution.
  • writer: Where the data should be saved. Check available writers.
  • writer_parameters: Key value pairs used to fine-tune writing process.
  • query_parameters: (Optional) Parameters for dynamically changing query text.
  • queries: A list of queries to execute in this step. Can be:
    • folder: Recursively finds all .sql files in the directory.
    • path: Path to a specific query file.
    • query: Inline query definition with text and title.
  • parallel_threshold: Custom threshold of parallel query execution for a given step.

Common Parameters

You can use YAML anchors and aliases to avoid repetition, which is especially useful for sharing configuration between steps.

# Define shared configuration
default_bq: &bq_defaults
  writer: bq
  writer_parameters:
    project: my-project
    dataset: my_dataset

steps:
  - alias: step_1
    fetcher: google-ads
    <<: *bq_defaults
    queries: ...

  - alias: step_2
    fetcher: google-ads
    <<: *bq_defaults
    queries: ...

Usage

grf workflow run -f workflow.yaml
from garf.executors.workflows import workflow_runner

runner = workflow_runner.WorkflowRunner.from_file("path/to/workflow.yaml")
runner.run()

Note

Ensure that API endpoint for garf is running.

python -m garf.executors.entrypoints.server

curl -X 'POST' \
  'http://127.0.0.1:8000/api/execute:workflow?workflow_file=workflow.yaml' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d ''

Customization

Overwrite parameters

With CLI flags

Your workflow might contain some parameters that should be provided during runtime (macros, templates, source parameters).

Suppose your workflow writes data to BigQuery and for a particular execution you want to provide a different dataset than one specified in a workflow.

grf workflow run -f workflow.yaml \
  --bq.project=my-other-project --bq.dataset=my_other_dataset

With config file

When number of parameters is huge or you want to have different configurations at hand, you can use config.

grf workflow run -f workflow.yaml -c config.yaml

You can overwrite parameters specified in config and workflow; garf respects the following precedence of parameters: CLI > Config > Workflow.

grf workflow run -f workflow.yaml -c config.yaml --bq.dataset=new_dataset

Include/Exclude steps

Instead of running the whole workflow you can selected or omit certain steps.

grf workflow run -f workflow.yaml --include alias_1 --exclude alias_3
from garf.executors.workflows import workflow_runner

runner = workflow_runner.WorkflowRunner.from_file("path/to/workflow.yaml")
runner.run(selected_aliases=['alias_1'], skipped_aliases=['alias_3'])

Note

Ensure that API endpoint for garf is running.

python -m garf.executors.entrypoints.server

curl -X 'POST' \
  'http://127.0.0.1:8000/api/execute:workflow?workflow_file=workflow.yaml' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "selected_aliases": [
    "alias_1"
  ],
  "skipped_aliases": [
    "alias_3"
  ]
}'

Embed queries

You can embed all necessary queries as texts into you workflow.

grf workflow compile -f workflow.yaml -o compiled-workflow.yaml

Deploy to Cloud Workflows

You can convert workflow to Google Cloud workflow yaml file for further deployment.

grf workflow deploy -f workflow.yaml -o gcp-cloud-workflow.yaml

Example

Here is a comprehensive example showing a multi-step pipeline:

bq_project: &bq_project "my-gcp-project"
bq_dataset: &bq_dataset "marketing_data"

steps:
  # Step 1: Fetch data from Google Ads
  - alias: ingest_ads
    fetcher: google-ads
    fetcher_parameters:
      account: "123-456-7890"
    writer: bq
    writer_parameters:
      project: *bq_project
      dataset: *bq_dataset
    queries:
      - path: queries/ads_reporting/roas.sql

  # Step 2: Filter data in BigQuery and save to CSV
  - alias: transform_data
    fetcher: bq
    fetcher_parameters:
      project: *bq_project
    queries:
      - query:
          title: "filtered_roas"
          text: "SELECT roas FROM `{dataset}.roas` WHERE roas > 1"
    query_parameters:
      macro:
        dataset: *bq_dataset
    writer: csv