Skip to content

Overview

PyPI Downloads PyPI

garf-executors is responsible for orchestrating process of fetching from API and storing data in a storage.

Currently the following executors are supported:

Installation

pip install garf-executors
pip install garf-executors[bq]
pip install garf-executors[sql]
pip install garf-executors[server]

Usage

After garf-executors is installed you can use garf utility to perform fetching.

garf <QUERIES> --source <API_SOURCE> \
  --output <OUTPUT_TYPE>

where

  • query - local or remote path(s) to files with queries.
  • source- type of API to use. Based on that the appropriate report fetcher will be initialized.
  • output - output supported by garf-io library.
from garf.executors import api_executor


query_executor = (
  api_executor.ApiQueryExecutor.from_fetcher_alias(
    source='API_SOURCE',
)
context = api_executor.ApiExecutionContext(writer='OUTPUT_TYPE')

query_text = 'YOUR_QUERY_HERE'

query_executor.execute(
  query=query_text,
  title="query",
  context=context
)

Note

You can use aexecute method to run execute the query asynchronously

await query_executor.aexecute(
  query=query_text,
  title="query",
  context=context
)

Note

Ensure that API endpoint for garf is running.

python -m garf.executors.entrypoints.server

curl -X 'POST' \
  'http://127.0.0.1:8000/api/execute' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "source": "API_SOURCE",
  "title": "query",
  "query": "YOUR_QUERY_HERE",
  "context": {
    "writer": "OUTPUT_TYPE"
  }
}'

Customization

Source

If your report fetcher requires additional parameters (accounts, ids, regions, categories, etc.) you can easily provide them.

Note

Concrete --source parameters are dependent on a particular report fetcher and should be looked up in a documentation for this fetcher.

garf <QUERIES> --source <API_SOURCE> \
  --output <OUTPUT_TYPE> \
  --source.params1=<VALUE>

where

  • query - local or remote path(s) to files with queries.
  • source- type of API to use. Based on that the appropriate report fetcher will be initialized.
  • output - output supported by garf-io library.
from garf.executors import api_executor


query_executor = (
  api_executor.ApiQueryExecutor.from_fetcher_alias(
    source='API_SOURCE',
)
context = api_executor.ApiExecutionContext(
  writer='OUTPUT_TYPE',
  fetcher_parameters={
    'param1': 'VALUE',
  }
)

query_text = 'YOUR_QUERY_HERE'

query_executor.execute(
  query=query_text,
  title="query",
  context=context
)
curl -X 'POST' \
  'http://127.0.0.1:8000/api/execute' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "source": "API_SOURCE",
  "title": "query",
  "query": "YOUR_QUERY_HERE",
  "context": {
    "writer": "OUTPUT_TYPE",
    "fetcher_parameters": {
      "param1": "VALUE"
    }
  }
}'

Macro

If your query contains macros you can provide values for them. Macros will be substituted by any value provided.

echo 'SELECT {key} AS value FROM resource' > query.sql

garf query.sql --source <API_SOURCE> \
  --output <OUTPUT_TYPE> \
  --macro.key=VALUE
from garf.executors import api_executor


query_executor = (
  api_executor.ApiQueryExecutor.from_fetcher_alias(
    source='API_SOURCE',
)
context = api_executor.ApiExecutionContext(
  writer='OUTPUT_TYPE',
  query_parameters={
    'query_parameters': {
      'macro': {
        'key': 'VALUE',
      }
    }
  }
)

query_text = 'SELECT {key} AS value FROM resource'

query_executor.execute(
  query=query_text,
  title="query",
  context=context
)
curl -X 'POST' \
  'http://127.0.0.1:8000/api/execute' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "source": "API_SOURCE",
  "title": "query",
  "query": "SELECT {key} AS value FROM resource",
  "context": {
    "writer": "OUTPUT_TYPE",
    "query_parameters": {
      "macro":  {
        "key": "VALUE"
      }
    }
  }
}'

Template

If your query contains templates you can provide values for them. Template will be dynamically change the query based on provided inputs.

echo """
SELECT
  {% if key == '0' %}
  column_1
  {% else %}
  column_2
  {% endif %}
FROM resource
""" > query.sql

garf query.sql --source <API_SOURCE> \
  --output <OUTPUT_TYPE> \
  --template.key=VALUE
from garf.executors import api_executor


query_executor = (
  api_executor.ApiQueryExecutor.from_fetcher_alias(
    source='API_SOURCE',
)
context = api_executor.ApiExecutionContext(
  writer='OUTPUT_TYPE',
  query_parameters={
    'query_parameters': {
      'template': {
        'key': 'VALUE',
      }
    }
  }
)

query_text = """
SELECT
  {% if key == '0' %}
  column_1
  {% else %}
  column_2
  {% endif %}
FROM resource
"""

query_executor.execute(
  query=query_text,
  title="query",
  context=context
)
curl -X 'POST' \
  'http://127.0.0.1:8000/api/execute' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "source": "API_SOURCE",
  "title": "query",
  "query": "SELECT {% if key == '0' %} column_1 {% else %} column_2 {% endif %} FROM resource",
  "context": {
    "writer": "OUTPUT_TYPE",
    "query_parameters": {
      "template":  {
        "key": "VALUE"
      }
    }
  }
}'

gQuery expansion

garf allows you to call itself to dynamically provide parameters for macro, template, or source using gquery prefix.

The structure of gquery is follows:

gquery:source_alias:select_statement

where:

  • source_alias is an alias for a source (currently only bq and sqldb are supported).
  • select_statement - valid SQL statement that should return a single column.

For example, if your query hash ids macro and you want to dynamically provide them based on some table in BigQuery, you can include use the following gquery as a macro.

echo 'SELECT {key} AS value FROM resource WHERE id IN ({ids})' > query.sql

garf query.sql --source <API_SOURCE> \
  --output <OUTPUT_TYPE> \
  --macro.key=VALUE \
  --macro.ids=gquery:bq:SELECT id FROM my_dataset.my_table
from garf.executors import api_executor


query_executor = (
  api_executor.ApiQueryExecutor.from_fetcher_alias(
    source='API_SOURCE',
)
context = api_executor.ApiExecutionContext(
  writer='OUTPUT_TYPE',
  query_parameters={
    'query_parameters': {
      'macro': {
        'key': 'VALUE',
        'ids': 'gquery:bq:SELECT id FROM my_dataset.my_table',
      }
    }
  }
)

query_text = 'SELECT {key} AS value FROM resource WHERE id IN ({ids})'

query_executor.execute(
  query=query_text,
  title="query",
  context=context
)
curl -X 'POST' \
  'http://127.0.0.1:8000/api/execute' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "source": "API_SOURCE",
  "title": "query",
  "query": "SELECT {key} AS value FROM resource WHERE id IN ({ids})",
  "context": {
    "writer": "OUTPUT_TYPE",
    "query_parameters": {
      "macro":  {
        "key": "VALUE",
        "ids": "gquery:bq:SELECT id FROM my_dataset.my_table"
      }
    }
  }
}'

Note

If your gquery contains macros and templates you need to explicitly provide them as macro or template parameter.

echo 'SELECT {key} AS value FROM resource WHERE id IN ({ids})' > query.sql

garf query.sql --source <API_SOURCE> \
  --output <OUTPUT_TYPE> \
  --macro.key=VALUE \
  --macro.dataset=my_dataset \
  --macro.ids=gquery:bq:SELECT id FROM {dataset}.my_table

Setting up executor for gquery

Depending on setup you might need to configure bq or sqldb executor.

echo 'SELECT {key} AS value FROM resource WHERE id IN ({ids})' > query.sql

garf query.sql --source <API_SOURCE> \
  --output <OUTPUT_TYPE> \
  --macro.key=VALUE \
  --bq.project=my-bq-project \
  --macro.dataset=my_dataset \
  --macro.ids=gquery:bq:SELECT id FROM {dataset}.my_table

Batch execution

You can to execute multiple queries in parallel.

garf *.sql --source <API_SOURCE> \
  --output <OUTPUT_TYPE> \
  --parallel-threshold 10
from garf.executors import api_executor


query_executor = (
  api_executor.ApiQueryExecutor.from_fetcher_alias(
    source='API_SOURCE',
)
context = api_executor.ApiExecutionContext(
  writer='OUTPUT_TYPE',
)

query_text_1 = "SELECT column FROM resource1"
query_text_2 = "SELECT column FROM resource2"
batch = {
  'query_1': query_text_1,
  'query_2': query_text_2,
}

query_executor.execute_batch(
  batch=batch,
  context=context,
  parallel_threshold=10,
)
curl -X 'POST' \
  'http://127.0.0.1:8000/api/execute:batch' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "source": "API_SOURCE",
  "query_path": [
    "path/to/query1.sql",
    "path/to/query2.sql"
  ],
  "context": {
    "writer": "OUTPUT_TYPE"
  }
}'