Workflows
Workflows in garf allow you to orchestrate complex data fetching and processing pipelines.
Instead of running individual queries, you can define a sequence of steps,
where each step interacts with a specific data source (fetcher) and writes to a destination.
Important
Starting from version 1.4.0 there have been added an experimental grf utility
which simplifies working with workflows.
Configuration
Workflows are defined in YAML files. The core structure consists of a list of
steps, where each step defines what data to fetch and where to save it.
Workflow Step Structure
steps:
- alias: step_name
fetcher: source_name
writer: destination
writer_parameters:
key: value
fetcher_parameters:
key: value
query_parameters:
macro:
key: value
template:
key: value
queries:
- folder: path/to/queries/
- path: path/to/query.sql
- query:
text: "SELECT 1"
title: "simple_query"
parallel_threshold: 10
Components
- fetcher: The source of data. Check available fetchers.
- fetcher_parameters: Key value pairs used to fine-tune fetching process.
- alias: A unique identifier for the step. Useful for logging and selective execution.
- writer: Where the data should be saved. Check available writers.
- writer_parameters: Key value pairs used to fine-tune writing process.
- query_parameters: (Optional) Parameters for dynamically changing query text.
- queries: A list of queries to execute in this step. Can be:
folder: Recursively finds all.sqlfiles in the directory.path: Path to a specific query file.query: Inline query definition withtextandtitle.
- parallel_threshold: Custom threshold of parallel query execution for a given step.
Common Parameters
You can use YAML anchors and aliases to avoid repetition, which is especially useful for sharing configuration between steps.
# Define shared configuration
default_bq: &bq_defaults
writer: bq
writer_parameters:
project: my-project
dataset: my_dataset
steps:
- alias: step_1
fetcher: google-ads
<<: *bq_defaults
queries: ...
- alias: step_2
fetcher: google-ads
<<: *bq_defaults
queries: ...
Usage
grf workflow run -f workflow.yaml
from garf.executors.workflows import workflow_runner
runner = workflow_runner.WorkflowRunner.from_file("path/to/workflow.yaml")
runner.run()
Note
Ensure that API endpoint for garf is running.
python -m garf.executors.entrypoints.server
curl -X 'POST' \
'http://127.0.0.1:8000/api/execute:workflow?workflow_file=workflow.yaml' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d ''
Customization
Overwrite parameters
With CLI flags
Your workflow might contain some parameters that should be provided during runtime (macros, templates, source parameters).
Suppose your workflow writes data to BigQuery and for a particular execution you want to provide a different dataset than one specified in a workflow.
grf workflow run -f workflow.yaml \
--bq.project=my-other-project --bq.dataset=my_other_dataset
With config file
When number of parameters is huge or you want to have different configurations at hand, you can use config.
grf workflow run -f workflow.yaml -c config.yaml
You can overwrite parameters specified in config and workflow;
garf respects the following precedence of parameters:
CLI > Config > Workflow.
grf workflow run -f workflow.yaml -c config.yaml --bq.dataset=new_dataset
Include/Exclude steps
Instead of running the whole workflow you can selected or omit certain steps.
grf workflow run -f workflow.yaml --include alias_1 --exclude alias_3
from garf.executors.workflows import workflow_runner
runner = workflow_runner.WorkflowRunner.from_file("path/to/workflow.yaml")
runner.run(selected_aliases=['alias_1'], skipped_aliases=['alias_3'])
Note
Ensure that API endpoint for garf is running.
python -m garf.executors.entrypoints.server
curl -X 'POST' \
'http://127.0.0.1:8000/api/execute:workflow?workflow_file=workflow.yaml' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"selected_aliases": [
"alias_1"
],
"skipped_aliases": [
"alias_3"
]
}'
Embed queries
You can embed all necessary queries as texts into you workflow.
grf workflow compile -f workflow.yaml -o compiled-workflow.yaml
Deploy to Cloud Workflows
You can convert workflow to Google Cloud workflow yaml file for further deployment.
grf workflow deploy -f workflow.yaml -o gcp-cloud-workflow.yaml
Example
Here is a comprehensive example showing a multi-step pipeline:
bq_project: &bq_project "my-gcp-project"
bq_dataset: &bq_dataset "marketing_data"
steps:
# Step 1: Fetch data from Google Ads
- alias: ingest_ads
fetcher: google-ads
fetcher_parameters:
account: "123-456-7890"
writer: bq
writer_parameters:
project: *bq_project
dataset: *bq_dataset
queries:
- path: queries/ads_reporting/roas.sql
# Step 2: Filter data in BigQuery and save to CSV
- alias: transform_data
fetcher: bq
fetcher_parameters:
project: *bq_project
queries:
- query:
title: "filtered_roas"
text: "SELECT roas FROM `{dataset}.roas` WHERE roas > 1"
query_parameters:
macro:
dataset: *bq_dataset
writer: csv