Workflows
Workflows in garf allow you to orchestrate complex data fetching and processing pipelines.
Instead of running individual queries, you can define a sequence of steps,
where each step interacts with a specific data source (fetcher) and writes to a destination.
Configuration
Workflows are defined in YAML files. The core structure consists of a list of
steps, where each step defines what data to fetch and where to save it.
Workflow Step Structure
steps:
- alias: step_name
fetcher: source_name
writer: destination
writer_parameters:
key: value
fetcher_parameters:
key: value
query_parameters:
macro:
key: value
template:
key: value
queries:
- folder: path/to/queries/
- path: path/to/query.sql
- query:
text: "SELECT 1"
title: "simple_query"
parallel_threshold: 10
Components
- fetcher: The source of data. Check available fetchers.
- fetcher_parameters: Key value pairs used to fine-tune fetching process.
- alias: A unique identifier for the step. Useful for logging and selective execution.
- writer: Where the data should be saved. Check available writers.
- writer_parameters: Key value pairs used to fine-tune writing process.
- query_parameters: (Optional) Parameters for dynamically changing query text.
- queries: A list of queries to execute in this step. Can be:
folder: Recursively finds all.sqlfiles in the directory.path: Path to a specific query file.query: Inline query definition withtextandtitle.
- parallel_threshold: Custom threshold of parallel query execution for a given step.
Common Parameters
You can use YAML anchors and aliases to avoid repetition, which is especially useful for sharing configuration between steps.
# Define shared configuration
default_bq: &bq_defaults
writer: bq
writer_parameters:
project: my-project
dataset: my_dataset
steps:
- alias: step_1
fetcher: google-ads
<<: *bq_defaults
queries: ...
- alias: step_2
fetcher: google-ads
<<: *bq_defaults
queries: ...
Usage
garf -w workflow.yaml
from garf.executors.workflows import workflow_runner
runner = workflow_runner.WorkflowRunner.from_file("path/to/workflow.yaml")
runner.run()
Note
Ensure that API endpoint for garf is running.
python -m garf.executors.entrypoints.server
curl -X 'POST' \
'http://127.0.0.1:8000/api/execute:workflow?workflow_file=workflow.yaml' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d ''
Customization
Include/Exclude Steps
Instead of running the whole workflow you can selected or omit certain steps.
garf -w workflow.yaml --workflow-include alias_1 --workflow-exclude alias_3
from garf.executors.workflows import workflow_runner
runner = workflow_runner.WorkflowRunner.from_file("path/to/workflow.yaml")
runner.run(selected_aliases=['alias_1'], skipped_aliases=['alias_3'])
Note
Ensure that API endpoint for garf is running.
python -m garf.executors.entrypoints.server
curl -X 'POST' \
'http://127.0.0.1:8000/api/execute:workflow?workflow_file=workflow.yaml' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"selected_aliases": [
"alias_1"
],
"skipped_aliases": [
"alias_3"
]
}'
Example
Here is a comprehensive example showing a multi-step pipeline:
bq_project: &bq_project "my-gcp-project"
bq_dataset: &bq_dataset "marketing_data"
steps:
# Step 1: Fetch data from Google Ads
- alias: ingest_ads
fetcher: google-ads
fetcher_parameters:
account: "123-456-7890"
writer: bq
writer_parameters:
project: *bq_project
dataset: *bq_dataset
queries:
- path: queries/ads_reporting/roas.sql
# Step 2: Filter data in BigQuery and save to CSV
- alias: transform_data
fetcher: bq
fetcher_parameters:
project: *bq_project
queries:
- query:
title: "filtered_roas"
text: "SELECT roas FROM `{dataset}.roas` WHERE roas > 1"
query_parameters:
macro:
dataset: *bq_dataset
writer: csv