Skip to content

Kafka

Important

To save data to Kafka install garf-io with Kafka support

pip install garf-io[kafka]

kafka writer allows you to publish GarfReport to a Kafka topic.

garf query.sql --source API_SOURCE \
  --output kafka \
  --kafka.bootstrap_servers=localhost:9092
from garf.core import report
from garf.io.writers import kafka_writer

# Create example report
sample_report = report.GarfReport(results=[[1]], column_names=['one'])

writer = kafka_writer.KafkaWriter(bootstrap_servers='localhost:9092')
writer.write(sample_report, 'topic_name')

Parameters

Bootstrap Servers

By default it connects to localhost:9092. You can overwrite it with bootstrap_servers parameter.

garf query.sql --source API_SOURCE \
  --output kafka \
  --kafka.bootstrap_servers=broker1:9092,broker2:9092
from garf.core import report
from garf.io.writers import kafka_writer

# Create example report
sample_report = report.GarfReport(results=[[1]], column_names=['one'])

writer = kafka_writer.KafkaWriter(bootstrap_servers="broker1:9092,broker2:9092")
writer.write(sample_report, 'topic_name')

push_strategy

By default Kafka writer pushes the whole report in a body. You can overwrite it with push_strategy parameter which supports three options:

  • report - pushes the whole report as a message.
  • batch - pushes N rows of report into a message.
  • row - pushes each row of report into a separate message.
garf query.sql --source API_SOURCE \
  --output kafka \
  --kafka.push-strategy=row
from garf.core import report
from garf.io.writers import kafka_writer

sample_report = report.GarfReport(results=[[1]], column_names=['one'])

writer = kafka_writer.KafkaWriter(push_strategy='row')
writer.write(sample_report, 'topic_name')

batch_size

For batch push_strategy the default number of messages in a batch is 10. You can overwrite it with batch_size parameter.

garf query.sql --source API_SOURCE \
  --output kafka \
  --kafka.push-strategy=batch \
  --kafka.batch-size=5
from garf.core import report
from garf.io.writers import kafka_writer

sample_report = report.GarfReport(results=[[1]], column_names=['one'])

writer = kafka_writer.KafkaWriter(push_strategy='batch', batch_size=5)
writer.write(sample_report, 'topic_name')